From efcbf639faa68656596d0c2797ab7115f663fff0 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Thu, 7 Jul 2022 14:39:43 +0800 Subject: [PATCH 1/8] feat add flink sql support --- .../factory/NebulaDynamicTableSource.java | 142 +++++------ .../connection/NebulaClientOptions.java | 2 +- .../nebula/sink/NebulaBatchExecutor.java | 2 +- .../nebula/sink/NebulaBatchOutputFormat.java | 32 +-- .../nebula/sink/NebulaEdgeBatchExecutor.java | 4 +- .../sink/NebulaVertexBatchExecutor.java | 5 +- .../nebula/source/NebulaInputFormat.java | 7 +- .../table/NebulaDynamicTableFactory.java | 140 ++++++++++- .../nebula/table/NebulaDynamicTableSink.java | 78 +++--- .../table/NebulaDynamicTableSource.java | 66 ++--- .../nebula/table/NebulaRowDataConverter.java | 229 ++++++++++++++++++ .../table/NebulaRowDataEdgeBatchExecutor.java | 47 ++++ .../table/NebulaRowDataInputFormat.java | 95 +++----- .../table/NebulaRowDataOutputFormat.java | 49 ++++ .../NebulaRowDataVertexBatchExecutor.java | 51 ++++ .../connector/nebula/utils/WriteModeEnum.java | 11 + .../org.apache.flink.table.factories.Factory | 6 + .../nebula/table/NebulaTableTest.java | 192 +++++++++++++++ 18 files changed, 929 insertions(+), 229 deletions(-) create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java create mode 100644 connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java index 6a21cbd..3d2ff0b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java @@ -1,71 +1,71 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package org.apache.flink.connector.nebula.catalog.factory; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; -import org.apache.flink.connector.nebula.table.NebulaRowDataInputFormat; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.InputFormatProvider; -import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.data.RowData; - -public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource, - SupportsProjectionPushDown { - - private final NebulaMetaConnectionProvider metaProvider; - private final ExecutionOptions executionOptions; - - public NebulaDynamicTableSource(NebulaMetaConnectionProvider metaProvider, - ExecutionOptions executionOptions) { - this.metaProvider = metaProvider; - this.executionOptions = executionOptions; - } - - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - return null; - } - - @Override - public ChangelogMode getChangelogMode() { - return ChangelogMode.insertOnly(); - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - return InputFormatProvider.of(getInputFormat(metaProvider)); - } - - @Override - public DynamicTableSource copy() { - return new NebulaDynamicTableSource(metaProvider, executionOptions); - } - - @Override - public String asSummaryString() { - return "Nebula"; - } - - @Override - public boolean supportsNestedProjection() { - return false; - } - - @Override - public void applyProjection(int[][] projectedFields) { - - } - - private InputFormat getInputFormat(NebulaMetaConnectionProvider metaProvider) { - return new NebulaRowDataInputFormat(); - } -} +// /* Copyright (c) 2020 vesoft inc. All rights reserved. +// * +// * This source code is licensed under Apache 2.0 License. +// */ +// +// package org.apache.flink.connector.nebula.catalog.factory; +// +// import org.apache.flink.api.common.io.InputFormat; +// import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; +// import org.apache.flink.connector.nebula.statement.ExecutionOptions; +// import org.apache.flink.connector.nebula.table.NebulaRowDataInputFormat; +// import org.apache.flink.table.connector.ChangelogMode; +// import org.apache.flink.table.connector.source.DynamicTableSource; +// import org.apache.flink.table.connector.source.InputFormatProvider; +// import org.apache.flink.table.connector.source.LookupTableSource; +// import org.apache.flink.table.connector.source.ScanTableSource; +// import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +// import org.apache.flink.table.data.RowData; +// +// public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource, +// SupportsProjectionPushDown { +// +// private final NebulaMetaConnectionProvider metaProvider; +// private final ExecutionOptions executionOptions; +// +// public NebulaDynamicTableSource(NebulaMetaConnectionProvider metaProvider, +// ExecutionOptions executionOptions) { +// this.metaProvider = metaProvider; +// this.executionOptions = executionOptions; +// } +// +// +// @Override +// public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { +// return null; +// } +// +// @Override +// public ChangelogMode getChangelogMode() { +// return ChangelogMode.insertOnly(); +// } +// +// @Override +// public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { +// return InputFormatProvider.of(getInputFormat(metaProvider)); +// } +// +// @Override +// public DynamicTableSource copy() { +// return new NebulaDynamicTableSource(metaProvider, executionOptions); +// } +// +// @Override +// public String asSummaryString() { +// return "Nebula"; +// } +// +// @Override +// public boolean supportsNestedProjection() { +// return false; +// } +// +// @Override +// public void applyProjection(int[][] projectedFields) { +// +// } +// +// private InputFormat getInputFormat(NebulaMetaConnectionProvider metaProvider) { +// return new NebulaRowDataInputFormat(); +// } +// } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index 3fa9d3f..402d5cd 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -221,7 +221,7 @@ public NebulaClientOptions build() { case SELF: if (selfSignParams == null) { throw new IllegalArgumentException("ssl is enabled and sign type is " - + "CA, selfSignParam must not be null"); + + "SELF, selfSignParam must not be null"); } break; default: diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java index a787644..845175e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java @@ -28,7 +28,7 @@ public NebulaBatchExecutor(ExecutionOptions executionOptions, * * @param record represent vertex or edge */ - abstract void addToBatch(T record); + protected abstract void addToBatch(T record); /** * execute the insert statement diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index bb2646e..2bc63c7 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -42,11 +42,11 @@ public class NebulaBatchOutputFormat extends RichOutputFormat implements F private volatile AtomicLong numPendingRow; private NebulaPool nebulaPool; private Session session; - private MetaClient metaClient; - private NebulaBatchExecutor nebulaBatchExecutor; + protected MetaClient metaClient; + private NebulaBatchExecutor nebulaBatchExecutor; private NebulaGraphConnectionProvider graphProvider; - private NebulaMetaConnectionProvider metaProvider; - private ExecutionOptions executionOptions; + protected NebulaMetaConnectionProvider metaProvider; + protected ExecutionOptions executionOptions; private List errorBuffer = new ArrayList<>(); private transient ScheduledExecutorService scheduler; @@ -100,16 +100,7 @@ public void open(int i, int i1) throws IOException { VidTypeEnum vidType = metaProvider.getVidType(metaClient, executionOptions.getGraphSpace()); boolean isVertex = executionOptions.getDataType().isVertex(); - Map schema; - if (isVertex) { - schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), - executionOptions.getLabel()); - nebulaBatchExecutor = new NebulaVertexBatchExecutor(executionOptions, vidType, schema); - } else { - schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), - executionOptions.getLabel()); - nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema); - } + nebulaBatchExecutor = getBatchExecutor(vidType, isVertex); // start the schedule task: submit the buffer records every batchInterval. // If batchIntervalMs is 0, do not start the scheduler task. if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) { @@ -127,6 +118,19 @@ public void open(int i, int i1) throws IOException { } } + protected NebulaBatchExecutor getBatchExecutor(VidTypeEnum vidType, boolean isVertex) { + Map schema; + if (isVertex) { + schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), + executionOptions.getLabel()); + return new NebulaVertexBatchExecutor<>(executionOptions, vidType, schema); + } else { + schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), + executionOptions.getLabel()); + return new NebulaEdgeBatchExecutor<>(executionOptions, vidType, schema); + } + } + /** * write one record to buffer */ diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index 1e23857..8a545cf 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -21,7 +21,7 @@ public class NebulaEdgeBatchExecutor extends NebulaBatchExecutor { private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class); - private final List nebulaEdgeList; + protected final List nebulaEdgeList; public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions, VidTypeEnum vidType, Map schema) { @@ -33,7 +33,7 @@ public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions, * put record into buffer */ @Override - void addToBatch(T record) { + protected void addToBatch(T record) { NebulaRowEdgeOutputFormatConverter converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions, vidType, schema); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index 8d06280..bef94d8 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -22,7 +22,7 @@ public class NebulaVertexBatchExecutor extends NebulaBatchExecutor { private static final Logger LOG = LoggerFactory.getLogger(NebulaVertexBatchExecutor.class); - private final List nebulaVertexList; + protected final List nebulaVertexList; public NebulaVertexBatchExecutor(ExecutionOptions executionOptions, VidTypeEnum vidType, Map schema) { @@ -36,7 +36,7 @@ public NebulaVertexBatchExecutor(ExecutionOptions executionOptions, * @param record represent vertex or edge */ @Override - void addToBatch(T record) { + protected void addToBatch(T record) { NebulaRowVertexOutputFormatConverter converter = new NebulaRowVertexOutputFormatConverter( (VertexExecutionOptions) executionOptions, vidType, schema); NebulaVertex vertex = converter.createVertex((Row) record, executionOptions.getPolicy()); @@ -90,5 +90,4 @@ String executeBatch(Session session) { nebulaVertexList.clear(); return null; } - } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java index c20e8bb..913d4a1 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java @@ -36,7 +36,7 @@ * @see NebulaStorageConnectionProvider * @see ExecutionOptions */ -abstract class NebulaInputFormat extends RichInputFormat { +public abstract class NebulaInputFormat extends RichInputFormat { protected static final Logger LOG = LoggerFactory.getLogger(NebulaInputFormat.class); private static final long serialVersionUID = 902031944252613459L; @@ -151,7 +151,7 @@ public T nextRecord(T reuse) throws IOException { if (!hasNext) { return null; } - LOG.info("nextRecord: {}", times++); + LOG.info("source nextRecord: {}", times++); BaseTableRow row = nebulaSource.next(); try { @@ -169,9 +169,8 @@ public void close() { LOG.info("Closing split (scanned {} rows)", scannedRows); } - public NebulaInputFormat setExecutionOptions(ExecutionOptions executionOptions) { + public NebulaInputFormat setExecutionOptions(ExecutionOptions executionOptions) { this.executionOptions = executionOptions; return this; } - } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java index d8df665..7509e4b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java @@ -5,15 +5,25 @@ package org.apache.flink.connector.nebula.table; -import java.util.HashSet; -import java.util.Set; +import java.util.*; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.nebula.connection.NebulaClientOptions; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.NebulaConstant; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; public class NebulaDynamicTableFactory implements DynamicTableSourceFactory, @@ -66,34 +76,133 @@ public class NebulaDynamicTableFactory implements DynamicTableSourceFactory, .key("connect-retry") .intType() .defaultValue(NebulaConstant.DEFAULT_CONNECT_RETRY) - .withDescription("the nebula connect retry times"); + .withDescription("the nebula connect retry times."); public static final ConfigOption TIMEOUT = ConfigOptions .key("timeout") .intType() .defaultValue(NebulaConstant.DEFAULT_TIMEOUT_MS) - .withDescription("the nebula execute timeout duration"); + .withDescription("the nebula execute timeout duration."); public static final ConfigOption EXECUTE_RETRY = ConfigOptions .key("execute-retry") .intType() .defaultValue(NebulaConstant.DEFAULT_EXECUTION_RETRY) - .withDescription("the nebula execute retry times"); + .withDescription("the nebula execute retry times."); + public static final ConfigOption WRITE_MODE = ConfigOptions + .key("write-mode") + .stringType() + .defaultValue("INSERT") + .withDescription("the nebula graph write mode."); + + public static final ConfigOption SRC_INDEX = ConfigOptions + .key("src-index") + .intType() + .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) + .withDescription("the nebula edge src index."); + + public static final ConfigOption DST_INDEX = ConfigOptions + .key("dst-index") + .intType() + .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) + .withDescription("the nebula edge dst index."); + + public static final ConfigOption RANK_INDEX = ConfigOptions + .key("rank-index") + .intType() + .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) + .withDescription("the nebula edge rank index."); @Override - public DynamicTableSink createDynamicTableSink(Context context) { - return new NebulaDynamicTableSink(METAADDRESS.key(), GRAPHADDRESS.key(), USERNAME.key(), - PASSWORD.key()); + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig readableConfig = helper.getOptions(); + helper.validate(); + validateConfigOptions(readableConfig); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + ExecutionOptions executionOptions = + getExecutionOptions(readableConfig, physicalSchema, context); + NebulaClientOptions nebulaClientOptions = getNebulaClientOptions(readableConfig); + return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, physicalSchema); } @Override - public DynamicTableSource createDynamicTableSource(Context context) { - String address = METAADDRESS.key(); - String username = USERNAME.key(); - String password = PASSWORD.key(); + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig readableConfig = helper.getOptions(); + helper.validate(); + validateConfigOptions(readableConfig); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + ExecutionOptions executionOptions = + getExecutionOptions(readableConfig, physicalSchema, context); + NebulaClientOptions nebulaClientOptions = getNebulaClientOptions(readableConfig); + return new NebulaDynamicTableSink(nebulaClientOptions, executionOptions, physicalSchema); + } + + private ExecutionOptions getExecutionOptions(ReadableConfig readableConfig, + TableSchema physicalSchema, Context context) { + String[] fieldNames = physicalSchema.getFieldNames(); + List fieldList = new ArrayList<>(); + List positionList = new ArrayList<>(); + String objectName = context.getObjectIdentifier().getObjectName(); + String[] typeAndLabel = objectName.split(NebulaConstant.SPLIT_POINT); + String type = typeAndLabel[0]; + WriteModeEnum writeMode = WriteModeEnum.chooseWriteMode(readableConfig.get(WRITE_MODE)); + + ExecutionOptions executionOptions; + if (DataTypeEnum.VERTEX.name().equals(type)) { + for (int i = 1; i < fieldNames.length; i++) { + fieldList.add(fieldNames[i]); + positionList.add(i); + } + executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace(readableConfig.get(GRAPH_SPACE)) + .setTag(readableConfig.get(LABEL_NAME)) + .setIdIndex(0) + .setFields(fieldList) + .setPositions(positionList) + .setWriteMode(writeMode) + .builder(); + } else { + for (int i = 3; i < fieldNames.length; i++) { + fieldList.add(fieldNames[i]); + positionList.add(i); + } + executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace(readableConfig.get(GRAPH_SPACE)) + .setEdge(readableConfig.get(LABEL_NAME)) + .setSrcIndex(readableConfig.get(SRC_INDEX)) + .setDstIndex(readableConfig.get(DST_INDEX)) + .setRankIndex(readableConfig.get(RANK_INDEX)) + .setFields(fieldList) + .setPositions(positionList) + .setWriteMode(writeMode) + .builder(); + } + return executionOptions; + } + + private NebulaClientOptions getNebulaClientOptions(ReadableConfig readableConfig) { + return new NebulaClientOptions.NebulaClientOptionsBuilder() + .setMetaAddress(readableConfig.get(METAADDRESS)) + .setGraphAddress(readableConfig.get(GRAPHADDRESS)) + .setUsername(readableConfig.get(USERNAME)) + .setPassword(readableConfig.get(PASSWORD)) + .build(); + } - return new NebulaDynamicTableSource(address, username, password); + private void validateConfigOptions(ReadableConfig readableConfig) { + String writeMode = readableConfig.get(WRITE_MODE); + if (!WriteModeEnum.checkValidWriteMode(writeMode)) { + throw new IllegalArgumentException( + String.format("Unknown sink.write-mode `%s`", writeMode) + ); + } } @Override @@ -114,10 +223,15 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> set = new HashSet<>(); + set.add(GRAPH_SPACE); + set.add(LABEL_NAME); set.add(CONNECT_TIMEOUT); set.add(CONNECT_RETRY); set.add(TIMEOUT); set.add(EXECUTE_RETRY); + set.add(SRC_INDEX); + set.add(DST_INDEX); + set.add(RANK_INDEX); return set; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java index f42ec9b..c7d326e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java @@ -3,28 +3,28 @@ import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat; -import org.apache.flink.connector.nebula.sink.NebulaSinkFunction; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.connector.sink.OutputFormatProvider; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; -public class NebulaDynamicTableSink implements DynamicTableSink { - private final String metaAddress; - private final String graphAddress; - +import java.util.Arrays; - private final String username; - private final String password; +public class NebulaDynamicTableSink implements DynamicTableSink { + private final NebulaClientOptions nebulaClientOptions; + private final ExecutionOptions executionOptions; + private final TableSchema tableSchema; - public NebulaDynamicTableSink(String metaAddress, String graphAddress, String username, - String password) { - this.metaAddress = metaAddress; - this.graphAddress = graphAddress; - this.username = username; - this.password = password; + public NebulaDynamicTableSink(NebulaClientOptions nebulaClientOptions, + ExecutionOptions executionOptions, + TableSchema tableSchema) { + this.nebulaClientOptions = nebulaClientOptions; + this.executionOptions = executionOptions; + this.tableSchema = tableSchema; } @Override @@ -38,31 +38,45 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return builder.build(); } + // @Override + // public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + // DataStructureConverter converter = + // context.createDataStructureConverter(tableSchema.toPhysicalRowDataType()); + // NebulaGraphConnectionProvider graphProvider = + // new NebulaGraphConnectionProvider(nebulaClientOptions); + // NebulaMetaConnectionProvider metaProvider = + // new NebulaMetaConnectionProvider(nebulaClientOptions); + // NebulaBatchOutputFormat outPutFormat = + // new NebulaRowDataOutputFormat(graphProvider, metaProvider, converter); + // outPutFormat.setExecutionOptions(executionOptions); + // NebulaSinkFunction sinkFunction = new NebulaSinkFunction<>(outPutFormat); + // return SinkFunctionProvider.of(sinkFunction); + // // return OutputFormatProvider.of(outPutFormat); + // } + @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - - NebulaClientOptions builder = new NebulaClientOptions.NebulaClientOptionsBuilder() - .setMetaAddress(metaAddress) - .setGraphAddress(graphAddress) - .setUsername(username) - .setPassword(password) - .build(); - - NebulaGraphConnectionProvider graphProvider = new NebulaGraphConnectionProvider(builder); - NebulaMetaConnectionProvider metaProvider = new NebulaMetaConnectionProvider(builder); - NebulaBatchOutputFormat outPutFormat = new NebulaBatchOutputFormat(graphProvider, - metaProvider); - NebulaSinkFunction sinkFunction = new NebulaSinkFunction<>(outPutFormat); - return SinkFunctionProvider.of(sinkFunction); + DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); + LogicalType[] logicalTypes = Arrays.stream(fieldDataTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new); + NebulaGraphConnectionProvider graphProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + NebulaMetaConnectionProvider metaProvider = + new NebulaMetaConnectionProvider(nebulaClientOptions); + NebulaRowDataOutputFormat outPutFormat = + new NebulaRowDataOutputFormat(graphProvider, metaProvider, logicalTypes); + outPutFormat.setExecutionOptions(executionOptions); + return OutputFormatProvider.of(outPutFormat); } @Override public DynamicTableSink copy() { - return null; + return new NebulaDynamicTableSink(nebulaClientOptions, executionOptions, tableSchema); } @Override public String asSummaryString() { - return null; + return "NebulaDynamicTableSink"; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java index 20affde..b33e326 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java @@ -5,39 +5,34 @@ package org.apache.flink.connector.nebula.table; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.connector.nebula.connection.NebulaClientOptions; +import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; -public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource, - SupportsProjectionPushDown { +import java.util.Arrays; - private final String address; - private final String username; - private final String password; +public class NebulaDynamicTableSource implements ScanTableSource { - public NebulaDynamicTableSource(String address, String username, String password) { - this.address = address; - this.username = username; - this.password = password; - } - - @Override - public DynamicTableSource copy() { - return new NebulaDynamicTableSource(address, username, password); - } - - @Override - public String asSummaryString() { - return "Nebula"; - } - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + private final NebulaClientOptions nebulaClientOptions; + private final ExecutionOptions executionOptions; + private final TableSchema tableSchema; - return null; + public NebulaDynamicTableSource(NebulaClientOptions nebulaClientOptions, + ExecutionOptions executionOptions, + TableSchema tableSchema) { + this.nebulaClientOptions = nebulaClientOptions; + this.executionOptions = executionOptions; + this.tableSchema = tableSchema; } @Override @@ -47,17 +42,26 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); + LogicalType[] logicalTypes = Arrays.stream(fieldDataTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new); - return null; + InputFormat inputFormat = new NebulaRowDataInputFormat( + new NebulaStorageConnectionProvider(this.nebulaClientOptions), + this.executionOptions, + logicalTypes + ); + return InputFormatProvider.of(inputFormat); } @Override - public boolean supportsNestedProjection() { - return false; + public DynamicTableSource copy() { + return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, tableSchema); } @Override - public void applyProjection(int[][] projectedFields) { - + public String asSummaryString() { + return "NebulaDynamicTableSource"; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java new file mode 100644 index 0000000..4130e9f --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java @@ -0,0 +1,229 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.table; + +import com.vesoft.nebula.client.graph.data.DateTimeWrapper; +import com.vesoft.nebula.client.graph.data.DateWrapper; +import com.vesoft.nebula.client.graph.data.TimeWrapper; +import com.vesoft.nebula.client.graph.data.ValueWrapper; +import com.vesoft.nebula.client.storage.data.BaseTableRow; +import org.apache.flink.connector.nebula.source.NebulaConverter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * convert nebula {@link BaseTableRow} to flink {@link RowData} + */ +public class NebulaRowDataConverter implements NebulaConverter { + + private final RowType rowType; + private final NebulaDeserializationConverter[] toInternalConverters; + private final NebulaSerializationConverter[] toExternalConverters; + private final LogicalType[] fieldTypes; + + public NebulaRowDataConverter(RowType rowType) { + this.rowType = checkNotNull(rowType); + this.fieldTypes = rowType.getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + this.toInternalConverters = new NebulaDeserializationConverter[rowType.getFieldCount()]; + this.toExternalConverters = new NebulaSerializationConverter[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + this.toInternalConverters[i] = createInternalConverter(fieldTypes[i]); + this.toExternalConverters[i] = createExternalConverter(fieldTypes[i]); + } + } + + @Override + public RowData convert(BaseTableRow record) throws UnsupportedEncodingException { + List values = record.getValues(); + GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); + for (int pos = 0; pos < rowType.getFieldCount(); pos++) { + ValueWrapper valueWrapper = values.get(pos); + if (valueWrapper != null) { + try { + genericRowData.setField(pos, toInternalConverters[pos].deserialize(valueWrapper)); + } catch (SQLException e) { + e.printStackTrace(); + } + } else { + genericRowData.setField(pos, null); + } + } + return genericRowData; + } + + public Row toExternal(RowData rowData) throws SQLException { + Row row = new Row(rowData.getArity()); + for(int i = 0; i < rowData.getArity(); i++) { + if (!rowData.isNullAt(i)) { + toExternalConverters[i].serialize(rowData, i, row); + } else { + row.setField(i, null); + } + } + return row; + } + + /** + * Runtime converter to convert Nebula BaseTableRow to {@link RowData} type object. + */ + @FunctionalInterface + interface NebulaDeserializationConverter extends Serializable { + /** + * Convert a Nebula DataStructure of {@link BaseTableRow} to the internal data structure object. + * + * @param baseTableRow + */ + Object deserialize(ValueWrapper baseTableRow) throws SQLException, UnsupportedEncodingException; + } + + @FunctionalInterface + interface NebulaSerializationConverter extends Serializable { + /** + * Convert a internal field to java object and fill into the Row. + */ + void serialize(RowData rowData, int index, Row row) throws SQLException; + } + + private NebulaDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case BOOLEAN: + return ValueWrapper::asBoolean; + case TINYINT: + case SMALLINT: + case INTEGER: + return val -> (int) val.asLong(); + case BIGINT: + return ValueWrapper::asLong; + case FLOAT: + case DOUBLE: + return ValueWrapper::asDouble; + case CHAR: + case VARCHAR: + return val -> val.isGeography() ? StringData.fromString( + val.asGeography().toString()) + : StringData.fromString(val.asString()); + case DATE: + return val -> { + DateWrapper dateWrapper = val.asDate(); + Date date = Date.valueOf(dateWrapper.toString()); + return (int) date.toLocalDate().toEpochDay(); + }; + case TIME_WITHOUT_TIME_ZONE: + return val -> { + TimeWrapper t = val.asTime(); + LocalTime localTime = LocalTime.of( + t.getHour(), t.getMinute(), t.getSecond()); + Time time = Time.valueOf(localTime); + return (int)(time.toLocalTime().toNanoOfDay() / 1_000_000L); + }; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return val -> { + if (val.isDateTime()) { + DateTimeWrapper t = val.asDateTime(); + LocalDateTime localDateTime = LocalDateTime.of(t.getYear(), t.getMonth(), t.getDay(), + t.getHour(), t.getMinute(), t.getSecond()); + return TimestampData.fromLocalDateTime(localDateTime); + } else { + return TimestampData.fromTimestamp(new Timestamp(val.asLong() * 1000)); + } + }; + case BINARY: + case ARRAY: + case ROW: + case MAP: + case MULTISET: + case RAW: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case VARBINARY: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + private NebulaSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return (val, idx, row) -> row.setField(idx, val.getBoolean(idx)); + case TINYINT: + return (val, idx, row) -> row.setField(idx, val.getByte(idx)); + case SMALLINT: + return (val, idx, row) -> row.setField(idx, val.getShort(idx)); + case INTEGER: + return (val, idx, row) -> row.setField(idx, val.getInt(idx)); + case BIGINT: + return (val, idx, row) -> row.setField(idx, val.getLong(idx)); + case FLOAT: + return (val, idx, row) -> row.setField(idx, val.getFloat(idx)); + case DOUBLE: + return (val, idx, row) -> row.setField(idx, val.getDouble(idx)); + case CHAR: + case VARCHAR: + return (val, idx, row) -> row.setField(idx, val.getString(idx).toString()); + case DATE: + return (val, idx, row) -> row.setField(idx, + Date.valueOf(LocalDate.ofEpochDay(val.getInt(idx)))); + case TIME_WITHOUT_TIME_ZONE: + return (val, idx, row) -> { + LocalTime localTime = LocalTime.ofNanoOfDay(val.getInt(idx) * 1_000_000L); + row.setField(idx, localTime.toString()); + }; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timeStampPrecision = + ((TimestampType) type).getPrecision(); + return (val, idx, row) -> { + row.setField(idx, Timestamp.from( + val.getTimestamp(idx, timeStampPrecision).toInstant())); + }; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int localZonedTimeStampPrecision = + ((LocalZonedTimestampType) type).getPrecision(); + return (val, idx, row) -> { + row.setField(idx, Timestamp.from( + val.getTimestamp(idx, localZonedTimeStampPrecision).toInstant())); + }; + case BINARY: + case ARRAY: + case ROW: + case MAP: + case MULTISET: + case RAW: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case VARBINARY: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java new file mode 100644 index 0000000..046a6c8 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java @@ -0,0 +1,47 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.table; + +import org.apache.flink.connector.nebula.sink.NebulaEdgeBatchExecutor; +import org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaEdge; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.table.data.RowData; + +import java.sql.SQLException; +import java.util.Map; + +public class NebulaRowDataEdgeBatchExecutor extends NebulaEdgeBatchExecutor { + + private final NebulaRowDataConverter nebulaConverter; + + public NebulaRowDataEdgeBatchExecutor(ExecutionOptions executionOptions, + VidTypeEnum vidType, Map schema, + NebulaRowDataConverter nebulaConverter) { + super(executionOptions, vidType, schema); + this.nebulaConverter = nebulaConverter; + } + + @Override + protected void addToBatch(RowData record) { + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions, + vidType, schema); + NebulaEdge edge = null; + try { + edge = converter.createEdge(this.nebulaConverter.toExternal(record), executionOptions.getPolicy()); + } catch (SQLException e) { + e.printStackTrace(); + } + if (edge == null) { + return; + } + nebulaEdgeList.add(edge); + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java index 0f5916d..bb27e2e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java @@ -1,64 +1,45 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + package org.apache.flink.connector.nebula.table; import java.io.IOException; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; + +import com.vesoft.nebula.client.storage.data.BaseTableRow; +import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider; +import org.apache.flink.connector.nebula.source.NebulaInputFormat; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.RowData; - -public class NebulaRowDataInputFormat extends RichInputFormat { - - @Override - public void openInputFormat() throws IOException { - super.openInputFormat(); - } - - @Override - public void closeInputFormat() throws IOException { - super.closeInputFormat(); - } - - @Override - public void configure(Configuration parameters) { - - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return null; - } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - return new GenericInputSplit[]{new GenericInputSplit(0, 1)}; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - @Override - public void open(InputSplit split) throws IOException { - - } - - @Override - public boolean reachedEnd() throws IOException { - return false; - } - - @Override - public RowData nextRecord(RowData reuse) throws IOException { - return null; - } - - @Override - public void close() throws IOException { - +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +/** + * implementation of NebulaInputFormat. + * Read NebulaGraph data in nebula's {@link BaseTableRow} format. + * how to use: + * NebulaInputTableRowFormat inputFormat = new NebulaInputTableRowFormat + * (storageConnectionProvider, vertexExecutionOptions); + * DataSource dataSource = env.createInput(inputFormat); + * + */ +public class NebulaRowDataInputFormat extends NebulaInputFormat { + + private final LogicalType[] logicalTypes; + + public NebulaRowDataInputFormat(NebulaStorageConnectionProvider storageConnectionProvider, + ExecutionOptions executionOptions, + LogicalType[] logicalTypes) { + super(storageConnectionProvider, executionOptions); + this.logicalTypes = logicalTypes; + } + + @Override + public void open(InputSplit inputSplit) throws IOException { + super.open(inputSplit); + RowType rowType = RowType.of(logicalTypes); + super.nebulaConverter = new NebulaRowDataConverter(rowType); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java new file mode 100644 index 0000000..162acaf --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java @@ -0,0 +1,49 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.table; + +import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; +import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; +import org.apache.flink.connector.nebula.sink.NebulaBatchExecutor; +import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Map; + +public class NebulaRowDataOutputFormat extends NebulaBatchOutputFormat { + + private final LogicalType[] logicalTypes; + + public NebulaRowDataOutputFormat(NebulaGraphConnectionProvider graphProvider, + NebulaMetaConnectionProvider metaProvider, + LogicalType[] logicalTypes) { + super(graphProvider, metaProvider); + this.logicalTypes = logicalTypes; + } + + @Override + protected NebulaBatchExecutor getBatchExecutor(VidTypeEnum vidType, boolean isVertex) { + RowType rowType = RowType.of(logicalTypes); + NebulaRowDataConverter nebulaConverter = new NebulaRowDataConverter(rowType); + Map schema; + NebulaBatchExecutor nebulaBatchExecutor = null; + if (isVertex) { + schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), + executionOptions.getLabel()); + nebulaBatchExecutor = new NebulaRowDataVertexBatchExecutor( + executionOptions, vidType, schema, nebulaConverter); + } else { + schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), + executionOptions.getLabel()); + nebulaBatchExecutor = new NebulaRowDataEdgeBatchExecutor( + executionOptions, vidType, schema, nebulaConverter); + } + return nebulaBatchExecutor; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java new file mode 100644 index 0000000..7eb4272 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java @@ -0,0 +1,51 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.table; + + +import org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter; +import org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaVertex; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.table.data.RowData; + +import java.sql.SQLException; +import java.util.Map; + +public class NebulaRowDataVertexBatchExecutor extends NebulaVertexBatchExecutor { + private final NebulaRowDataConverter nebulaConverter; + + public NebulaRowDataVertexBatchExecutor(ExecutionOptions executionOptions, + VidTypeEnum vidType, + Map schema, + NebulaRowDataConverter nebulaConverter) { + super(executionOptions, vidType, schema); + this.nebulaConverter = nebulaConverter; + } + + /** + * put record into buffer + * + * @param record represent vertex or edge + */ + @Override + protected void addToBatch(RowData record) { + NebulaRowVertexOutputFormatConverter converter = new NebulaRowVertexOutputFormatConverter( + (VertexExecutionOptions) executionOptions, vidType, schema); + NebulaVertex vertex = null; + try { + vertex = converter.createVertex(this.nebulaConverter.toExternal(record), executionOptions.getPolicy()); + } catch (SQLException e) { + e.printStackTrace(); + } + if (vertex == null) { + return; + } + nebulaVertexList.add(vertex); + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java index 9b7b7a7..7807fba 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java @@ -26,4 +26,15 @@ public enum WriteModeEnum { WriteModeEnum(String mode) { this.mode = mode; } + + public static boolean checkValidWriteMode(String modeName) { + return chooseWriteMode(modeName) != INSERT || + INSERT.name().equalsIgnoreCase(modeName); + } + + public static WriteModeEnum chooseWriteMode(String modeName) { + if (UPDATE.name().equalsIgnoreCase(modeName)) return UPDATE; + if (DELETE.name().equalsIgnoreCase(modeName)) return DELETE; + return INSERT; + } } diff --git a/connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..158de3f --- /dev/null +++ b/connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,6 @@ +# Copyright (c) 2020 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +# + +org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java new file mode 100644 index 0000000..2aa93d9 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java @@ -0,0 +1,192 @@ +package org.apache.flink.connector.nebula.table; + + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * make sure your environment has creates space, and data has been insert into this space. + * Space schema: + * + *

"CREATE SPACE `testFlinkSource` (partition_num = 100, replica_factor = 3, charset = utf8, + * collate = utf8_bin, vid_type = INT64, atomic_edge = false)" + * + *

"USE `testFlinkSource`" + * + *

"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, + * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, + * col12 float, col13 time, col14 geography);" + * + *

"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, + * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, + * col12 float, col13 time, col14 geography);" + * + *

"CREATE SPACE `testFlinkSink` (partition_num = 100, replica_factor = 3, charset = utf8, + * collate = utf8_bin, vid_type = INT64, atomic_edge = false)" + * + *

"USE `testFlinkSink`" + * + *

"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, + * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, + * col12 float, col13 time, col14 geography);" + * + *

"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, + * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, + * col12 float, col13 time, col14 geography);" + */ +public class NebulaTableTest { + private static final Logger log = LoggerFactory.getLogger(NebulaTableTest.class); + + @Test + public void testVertexTransfer() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .useBlinkPlanner() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + + String creatSourceDDL = "CREATE TABLE `VERTEX.personSource` (" + + " vid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'testFlinkSource'," + + " 'label-name' = 'person'" + + ")"; + tableEnv.executeSql(creatSourceDDL); + + String creatSinkDDL = "CREATE TABLE `VERTEX.personSink` (" + + " vid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'testFlinkSink'," + + " 'label-name' = 'person'" + + ")"; + tableEnv.executeSql(creatSinkDDL); + + Table table = tableEnv.sqlQuery("SELECT * FROM `VERTEX.personSource`"); + table.executeInsert("`VERTEX.personSink`"); + } + + @Test + public void testEdgeTransfer() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .useBlinkPlanner() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + + String creatSourceDDL = "CREATE TABLE `EDGE.friendSource` (" + + " sid BIGINT," + + " eid BIGINT," + + " rid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'testFlinkSource'," + + " 'label-name' = 'friend'," + + " 'src-index' = '0'," + + " 'dst-index' = '1'," + + " 'rank-index' = '2'" + + ")"; + tableEnv.executeSql(creatSourceDDL); + + String creatSinkDDL = "CREATE TABLE `EDGE.friendSink` (" + + " sid BIGINT," + + " eid BIGINT," + + " rid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'testFlinkSink'," + + " 'label-name' = 'friend'," + + " 'src-index' = '0'," + + " 'dst-index' = '1'," + + " 'rank-index' = '2'" + + ")"; + tableEnv.executeSql(creatSinkDDL); + + Table table = tableEnv.sqlQuery("SELECT * FROM `EDGE.friendSource`"); + table.executeInsert("`EDGE.friendSink`"); + } +} From 96aff74f2d7d571ae11e6e408cf2cb92379cf641 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Thu, 7 Jul 2022 17:31:33 +0800 Subject: [PATCH 2/8] feat add flink sql support --- .../factory/NebulaDynamicTableSource.java | 71 ------------------- .../connection/NebulaClientOptions.java | 2 +- .../table/NebulaDynamicTableFactory.java | 6 +- .../nebula/table/NebulaDynamicTableSink.java | 24 ++----- .../table/NebulaDynamicTableSource.java | 4 +- .../nebula/table/NebulaRowDataConverter.java | 43 +++++------ .../table/NebulaRowDataEdgeBatchExecutor.java | 8 ++- .../table/NebulaRowDataInputFormat.java | 3 +- .../table/NebulaRowDataOutputFormat.java | 7 +- .../NebulaRowDataVertexBatchExecutor.java | 10 +-- 10 files changed, 49 insertions(+), 129 deletions(-) delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java deleted file mode 100644 index 3d2ff0b..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -// /* Copyright (c) 2020 vesoft inc. All rights reserved. -// * -// * This source code is licensed under Apache 2.0 License. -// */ -// -// package org.apache.flink.connector.nebula.catalog.factory; -// -// import org.apache.flink.api.common.io.InputFormat; -// import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -// import org.apache.flink.connector.nebula.statement.ExecutionOptions; -// import org.apache.flink.connector.nebula.table.NebulaRowDataInputFormat; -// import org.apache.flink.table.connector.ChangelogMode; -// import org.apache.flink.table.connector.source.DynamicTableSource; -// import org.apache.flink.table.connector.source.InputFormatProvider; -// import org.apache.flink.table.connector.source.LookupTableSource; -// import org.apache.flink.table.connector.source.ScanTableSource; -// import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -// import org.apache.flink.table.data.RowData; -// -// public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource, -// SupportsProjectionPushDown { -// -// private final NebulaMetaConnectionProvider metaProvider; -// private final ExecutionOptions executionOptions; -// -// public NebulaDynamicTableSource(NebulaMetaConnectionProvider metaProvider, -// ExecutionOptions executionOptions) { -// this.metaProvider = metaProvider; -// this.executionOptions = executionOptions; -// } -// -// -// @Override -// public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { -// return null; -// } -// -// @Override -// public ChangelogMode getChangelogMode() { -// return ChangelogMode.insertOnly(); -// } -// -// @Override -// public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { -// return InputFormatProvider.of(getInputFormat(metaProvider)); -// } -// -// @Override -// public DynamicTableSource copy() { -// return new NebulaDynamicTableSource(metaProvider, executionOptions); -// } -// -// @Override -// public String asSummaryString() { -// return "Nebula"; -// } -// -// @Override -// public boolean supportsNestedProjection() { -// return false; -// } -// -// @Override -// public void applyProjection(int[][] projectedFields) { -// -// } -// -// private InputFormat getInputFormat(NebulaMetaConnectionProvider metaProvider) { -// return new NebulaRowDataInputFormat(); -// } -// } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index 402d5cd..3fa9d3f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -221,7 +221,7 @@ public NebulaClientOptions build() { case SELF: if (selfSignParams == null) { throw new IllegalArgumentException("ssl is enabled and sign type is " - + "SELF, selfSignParam must not be null"); + + "CA, selfSignParam must not be null"); } break; default: diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java index 7509e4b..e1639b8 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java @@ -5,8 +5,10 @@ package org.apache.flink.connector.nebula.table; -import java.util.*; - +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java index c7d326e..507cd2a 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java @@ -1,5 +1,11 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + package org.apache.flink.connector.nebula.table; +import java.util.Arrays; import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; @@ -12,8 +18,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; -import java.util.Arrays; - public class NebulaDynamicTableSink implements DynamicTableSink { private final NebulaClientOptions nebulaClientOptions; private final ExecutionOptions executionOptions; @@ -38,22 +42,6 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return builder.build(); } - // @Override - // public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - // DataStructureConverter converter = - // context.createDataStructureConverter(tableSchema.toPhysicalRowDataType()); - // NebulaGraphConnectionProvider graphProvider = - // new NebulaGraphConnectionProvider(nebulaClientOptions); - // NebulaMetaConnectionProvider metaProvider = - // new NebulaMetaConnectionProvider(nebulaClientOptions); - // NebulaBatchOutputFormat outPutFormat = - // new NebulaRowDataOutputFormat(graphProvider, metaProvider, converter); - // outPutFormat.setExecutionOptions(executionOptions); - // NebulaSinkFunction sinkFunction = new NebulaSinkFunction<>(outPutFormat); - // return SinkFunctionProvider.of(sinkFunction); - // // return OutputFormatProvider.of(outPutFormat); - // } - @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java index b33e326..dfe327e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSource.java @@ -5,6 +5,8 @@ package org.apache.flink.connector.nebula.table; + +import java.util.Arrays; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider; @@ -19,8 +21,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; -import java.util.Arrays; - public class NebulaDynamicTableSource implements ScanTableSource { private final NebulaClientOptions nebulaClientOptions; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java index 4130e9f..82851df 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java @@ -5,22 +5,13 @@ package org.apache.flink.connector.nebula.table; +import static org.apache.flink.util.Preconditions.checkNotNull; + import com.vesoft.nebula.client.graph.data.DateTimeWrapper; import com.vesoft.nebula.client.graph.data.DateWrapper; import com.vesoft.nebula.client.graph.data.TimeWrapper; import com.vesoft.nebula.client.graph.data.ValueWrapper; import com.vesoft.nebula.client.storage.data.BaseTableRow; -import org.apache.flink.connector.nebula.source.NebulaConverter; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.LocalZonedTimestampType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.types.Row; - import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.sql.Date; @@ -31,8 +22,16 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import org.apache.flink.connector.nebula.source.NebulaConverter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.Row; /** * convert nebula {@link BaseTableRow} to flink {@link RowData} @@ -66,7 +65,8 @@ public RowData convert(BaseTableRow record) throws UnsupportedEncodingException ValueWrapper valueWrapper = values.get(pos); if (valueWrapper != null) { try { - genericRowData.setField(pos, toInternalConverters[pos].deserialize(valueWrapper)); + genericRowData.setField(pos, + toInternalConverters[pos].deserialize(valueWrapper)); } catch (SQLException e) { e.printStackTrace(); } @@ -79,11 +79,11 @@ public RowData convert(BaseTableRow record) throws UnsupportedEncodingException public Row toExternal(RowData rowData) throws SQLException { Row row = new Row(rowData.getArity()); - for(int i = 0; i < rowData.getArity(); i++) { + for (int i = 0; i < rowData.getArity(); i++) { if (!rowData.isNullAt(i)) { toExternalConverters[i].serialize(rowData, i, row); } else { - row.setField(i, null); + row.setField(i, null); } } return row; @@ -95,11 +95,11 @@ public Row toExternal(RowData rowData) throws SQLException { @FunctionalInterface interface NebulaDeserializationConverter extends Serializable { /** - * Convert a Nebula DataStructure of {@link BaseTableRow} to the internal data structure object. - * - * @param baseTableRow + * Convert a Nebula DataStructure of {@link BaseTableRow} + * to the internal data structure object. */ - Object deserialize(ValueWrapper baseTableRow) throws SQLException, UnsupportedEncodingException; + Object deserialize(ValueWrapper baseTableRow) + throws SQLException, UnsupportedEncodingException; } @FunctionalInterface @@ -150,7 +150,8 @@ private NebulaDeserializationConverter createInternalConverter(LogicalType type) return val -> { if (val.isDateTime()) { DateTimeWrapper t = val.asDateTime(); - LocalDateTime localDateTime = LocalDateTime.of(t.getYear(), t.getMonth(), t.getDay(), + LocalDateTime localDateTime = LocalDateTime.of( + t.getYear(), t.getMonth(), t.getDay(), t.getHour(), t.getMinute(), t.getSecond()); return TimestampData.fromLocalDateTime(localDateTime); } else { diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java index 046a6c8..3064013 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java @@ -5,6 +5,8 @@ package org.apache.flink.connector.nebula.table; +import java.sql.SQLException; +import java.util.Map; import org.apache.flink.connector.nebula.sink.NebulaEdgeBatchExecutor; import org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter; import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; @@ -13,8 +15,6 @@ import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.apache.flink.table.data.RowData; -import java.sql.SQLException; -import java.util.Map; public class NebulaRowDataEdgeBatchExecutor extends NebulaEdgeBatchExecutor { @@ -35,7 +35,9 @@ protected void addToBatch(RowData record) { vidType, schema); NebulaEdge edge = null; try { - edge = converter.createEdge(this.nebulaConverter.toExternal(record), executionOptions.getPolicy()); + edge = converter.createEdge( + this.nebulaConverter.toExternal(record), + executionOptions.getPolicy()); } catch (SQLException e) { e.printStackTrace(); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java index bb27e2e..737876c 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java @@ -5,9 +5,8 @@ package org.apache.flink.connector.nebula.table; -import java.io.IOException; - import com.vesoft.nebula.client.storage.data.BaseTableRow; +import java.io.IOException; import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider; import org.apache.flink.connector.nebula.source.NebulaInputFormat; import org.apache.flink.connector.nebula.statement.ExecutionOptions; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java index 162acaf..674ea7f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java @@ -5,6 +5,7 @@ package org.apache.flink.connector.nebula.table; +import java.util.Map; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; import org.apache.flink.connector.nebula.sink.NebulaBatchExecutor; @@ -14,8 +15,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import java.util.Map; - public class NebulaRowDataOutputFormat extends NebulaBatchOutputFormat { private final LogicalType[] logicalTypes; @@ -32,11 +31,11 @@ protected NebulaBatchExecutor getBatchExecutor(VidTypeEnum vidType, boo RowType rowType = RowType.of(logicalTypes); NebulaRowDataConverter nebulaConverter = new NebulaRowDataConverter(rowType); Map schema; - NebulaBatchExecutor nebulaBatchExecutor = null; + NebulaBatchExecutor nebulaBatchExecutor; if (isVertex) { schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), executionOptions.getLabel()); - nebulaBatchExecutor = new NebulaRowDataVertexBatchExecutor( + nebulaBatchExecutor = new NebulaRowDataVertexBatchExecutor( executionOptions, vidType, schema, nebulaConverter); } else { schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java index 7eb4272..c3b5158 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java @@ -5,7 +5,8 @@ package org.apache.flink.connector.nebula.table; - +import java.sql.SQLException; +import java.util.Map; import org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter; import org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor; import org.apache.flink.connector.nebula.statement.ExecutionOptions; @@ -14,9 +15,6 @@ import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.apache.flink.table.data.RowData; -import java.sql.SQLException; -import java.util.Map; - public class NebulaRowDataVertexBatchExecutor extends NebulaVertexBatchExecutor { private final NebulaRowDataConverter nebulaConverter; @@ -39,7 +37,9 @@ protected void addToBatch(RowData record) { (VertexExecutionOptions) executionOptions, vidType, schema); NebulaVertex vertex = null; try { - vertex = converter.createVertex(this.nebulaConverter.toExternal(record), executionOptions.getPolicy()); + vertex = converter.createVertex( + this.nebulaConverter.toExternal(record), + executionOptions.getPolicy()); } catch (SQLException e) { e.printStackTrace(); } From c7ab5ea7c2e9bf779ffe1364d08293cfde2af7fa Mon Sep 17 00:00:00 2001 From: liuxiao Date: Mon, 11 Jul 2022 18:11:27 +0800 Subject: [PATCH 3/8] fix CI --- .../connector/nebula/utils/WriteModeEnum.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java index 7807fba..17ad34e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java @@ -28,13 +28,17 @@ public enum WriteModeEnum { } public static boolean checkValidWriteMode(String modeName) { - return chooseWriteMode(modeName) != INSERT || - INSERT.name().equalsIgnoreCase(modeName); + return chooseWriteMode(modeName) != INSERT + || INSERT.name().equalsIgnoreCase(modeName); } public static WriteModeEnum chooseWriteMode(String modeName) { - if (UPDATE.name().equalsIgnoreCase(modeName)) return UPDATE; - if (DELETE.name().equalsIgnoreCase(modeName)) return DELETE; + if (UPDATE.name().equalsIgnoreCase(modeName)) { + return UPDATE; + } + if (DELETE.name().equalsIgnoreCase(modeName)) { + return DELETE; + } return INSERT; } } From 3a7657b6ae0531c6dd02cf85f8d3f68b28b8b6de Mon Sep 17 00:00:00 2001 From: liuxiao Date: Sat, 23 Jul 2022 03:23:43 +0800 Subject: [PATCH 4/8] fix: handle conflict with current sink --- .../nebula/sink/NebulaBatchExecutor.java | 2 +- .../nebula/sink/NebulaEdgeBatchExecutor.java | 2 +- .../sink/NebulaVertexBatchExecutor.java | 2 +- .../table/NebulaDynamicTableFactory.java | 149 +------ .../nebula/table/NebulaRowDataConverter.java | 17 +- .../table/NebulaRowDataEdgeBatchExecutor.java | 49 --- .../table/NebulaRowDataInputFormat.java | 5 - .../table/NebulaRowDataOutputFormat.java | 48 --- .../NebulaRowDataVertexBatchExecutor.java | 51 --- .../AbstractNebulaInputFormatITTest.java | 365 ++++++++++++++++++ .../nebula/table/NebulaTableTest.java | 192 --------- 11 files changed, 401 insertions(+), 481 deletions(-) delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java delete mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java index 845175e..a787644 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java @@ -28,7 +28,7 @@ public NebulaBatchExecutor(ExecutionOptions executionOptions, * * @param record represent vertex or edge */ - protected abstract void addToBatch(T record); + abstract void addToBatch(T record); /** * execute the insert statement diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index 8a545cf..fb38d8d 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -33,7 +33,7 @@ public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions, * put record into buffer */ @Override - protected void addToBatch(T record) { + void addToBatch(T record) { NebulaRowEdgeOutputFormatConverter converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions, vidType, schema); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index bef94d8..d517925 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -36,7 +36,7 @@ public NebulaVertexBatchExecutor(ExecutionOptions executionOptions, * @param record represent vertex or edge */ @Override - protected void addToBatch(T record) { + void addToBatch(T record) { NebulaRowVertexOutputFormatConverter converter = new NebulaRowVertexOutputFormatConverter( (VertexExecutionOptions) executionOptions, vidType, schema); NebulaVertex vertex = converter.createVertex((Row) record, executionOptions.getPolicy()); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java index 38dd004..2808bbc 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java @@ -18,16 +18,14 @@ import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.NebulaConstant; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.connector.nebula.utils.WriteModeEnum; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; @@ -81,25 +79,25 @@ public class NebulaDynamicTableFactory implements DynamicTableSourceFactory, .key("timeout") .intType() .defaultValue(NebulaConstant.DEFAULT_TIMEOUT_MS) - .withDescription("the nebula execute timeout duration"); + .withDescription("the nebula execute timeout duration."); public static final ConfigOption SRC_ID_INDEX = ConfigOptions .key("src-id-index") .intType() .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) - .withDescription("the nebula execute edge src index"); + .withDescription("the nebula execute edge src index."); public static final ConfigOption DST_ID_INDEX = ConfigOptions .key("dst-id-index") .intType() .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) - .withDescription("the nebula execute edge dst index"); + .withDescription("the nebula execute edge dst index."); public static final ConfigOption RANK_ID_INDEX = ConfigOptions .key("rank-id-index") .intType() .defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX) - .withDescription("the nebula execute rank index"); + .withDescription("the nebula execute rank index."); @Override public DynamicTableSink createDynamicTableSink(Context context) { @@ -116,37 +114,22 @@ public DynamicTableSink createDynamicTableSink(Context context) { getClientOptions(config), getExecutionOptions(context, config), producedDataType); } - private void validateConfigOptions(ReadableConfig config) { - if (!config.getOptional(METAADDRESS).isPresent()) { - throw new IllegalArgumentException( - String.format("The value of '%s' option should not be null", - METAADDRESS.key())); - } - - if (!config.getOptional(GRAPHADDRESS).isPresent()) { - throw new IllegalArgumentException( - String.format("The value of '%s' option should not be null", - GRAPHADDRESS.key())); - } - - if (!config.getOptional(USERNAME).isPresent()) { - throw new IllegalArgumentException( - String.format("The value of '%s' option should not be null", - USERNAME.key())); - } - - if (!config.getOptional(PASSWORD).isPresent()) { - throw new IllegalArgumentException( - String.format("The value of '%s' option should not be null", PASSWORD.key())); - } - - if (!config.getOptional(GRAPH_SPACE).isPresent()) { - throw new IllegalArgumentException( - String.format("The value of '%s' option should not be null", - GRAPH_SPACE.key())); - } + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig readableConfig = helper.getOptions(); + helper.validate(); + validateConfigOptions(readableConfig); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + ExecutionOptions executionOptions = getExecutionOptions(context, readableConfig); + NebulaClientOptions nebulaClientOptions = getClientOptions(readableConfig); + return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, physicalSchema); + } - if (config.get(TIMEOUT) < 0) { + private void validateConfigOptions(ReadableConfig config) { + if (config.getOptional(TIMEOUT).isPresent() && config.get(TIMEOUT) < 0) { throw new IllegalArgumentException( String.format("The value of '%s' option should not be negative, but is %s.", TIMEOUT.key(), config.get(TIMEOUT))); @@ -167,7 +150,6 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con List positions = new ArrayList<>(); List columns = context.getCatalogTable().getResolvedSchema().getColumns(); - if (config.get(DATA_TYPE).isVertex()) { for (int i = 1; i < columns.size(); i++) { positions.add(i); @@ -201,97 +183,6 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con } } - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - final FactoryUtil.TableFactoryHelper helper = - FactoryUtil.createTableFactoryHelper(this, context); - final ReadableConfig readableConfig = helper.getOptions(); - helper.validate(); - validateConfigOptions(readableConfig); - TableSchema physicalSchema = - TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - ExecutionOptions executionOptions = - getExecutionOptions(readableConfig, physicalSchema, context); - NebulaClientOptions nebulaClientOptions = getNebulaClientOptions(readableConfig); - return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, physicalSchema); - } - - @Override - public DynamicTableSink createDynamicTableSink(Context context) { - final FactoryUtil.TableFactoryHelper helper = - FactoryUtil.createTableFactoryHelper(this, context); - final ReadableConfig readableConfig = helper.getOptions(); - helper.validate(); - validateConfigOptions(readableConfig); - TableSchema physicalSchema = - TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - ExecutionOptions executionOptions = - getExecutionOptions(readableConfig, physicalSchema, context); - NebulaClientOptions nebulaClientOptions = getNebulaClientOptions(readableConfig); - return new NebulaDynamicTableSink(nebulaClientOptions, executionOptions, physicalSchema); - } - - private ExecutionOptions getExecutionOptions(ReadableConfig readableConfig, - TableSchema physicalSchema, Context context) { - String[] fieldNames = physicalSchema.getFieldNames(); - List fieldList = new ArrayList<>(); - List positionList = new ArrayList<>(); - String objectName = context.getObjectIdentifier().getObjectName(); - String[] typeAndLabel = objectName.split(NebulaConstant.SPLIT_POINT); - String type = typeAndLabel[0]; - WriteModeEnum writeMode = WriteModeEnum.chooseWriteMode(readableConfig.get(WRITE_MODE)); - - ExecutionOptions executionOptions; - if (DataTypeEnum.VERTEX.name().equals(type)) { - for (int i = 1; i < fieldNames.length; i++) { - fieldList.add(fieldNames[i]); - positionList.add(i); - } - executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() - .setGraphSpace(readableConfig.get(GRAPH_SPACE)) - .setTag(readableConfig.get(LABEL_NAME)) - .setIdIndex(0) - .setFields(fieldList) - .setPositions(positionList) - .setWriteMode(writeMode) - .builder(); - } else { - for (int i = 3; i < fieldNames.length; i++) { - fieldList.add(fieldNames[i]); - positionList.add(i); - } - executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder() - .setGraphSpace(readableConfig.get(GRAPH_SPACE)) - .setEdge(readableConfig.get(LABEL_NAME)) - .setSrcIndex(readableConfig.get(SRC_INDEX)) - .setDstIndex(readableConfig.get(DST_INDEX)) - .setRankIndex(readableConfig.get(RANK_INDEX)) - .setFields(fieldList) - .setPositions(positionList) - .setWriteMode(writeMode) - .builder(); - } - return executionOptions; - } - - private NebulaClientOptions getNebulaClientOptions(ReadableConfig readableConfig) { - return new NebulaClientOptions.NebulaClientOptionsBuilder() - .setMetaAddress(readableConfig.get(METAADDRESS)) - .setGraphAddress(readableConfig.get(GRAPHADDRESS)) - .setUsername(readableConfig.get(USERNAME)) - .setPassword(readableConfig.get(PASSWORD)) - .build(); - } - - private void validateConfigOptions(ReadableConfig readableConfig) { - String writeMode = readableConfig.get(WRITE_MODE); - if (!WriteModeEnum.checkValidWriteMode(writeMode)) { - throw new IllegalArgumentException( - String.format("Unknown sink.write-mode `%s`", writeMode) - ); - } - } - @Override public String factoryIdentifier() { return IDENTIFIER; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java index 82851df..68f9715 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java @@ -105,7 +105,7 @@ Object deserialize(ValueWrapper baseTableRow) @FunctionalInterface interface NebulaSerializationConverter extends Serializable { /** - * Convert a internal field to java object and fill into the Row. + * Convert an internal field to java object and fill into the Row. */ void serialize(RowData rowData, int index, Row row) throws SQLException; } @@ -127,9 +127,18 @@ private NebulaDeserializationConverter createInternalConverter(LogicalType type) return ValueWrapper::asDouble; case CHAR: case VARCHAR: - return val -> val.isGeography() ? StringData.fromString( - val.asGeography().toString()) - : StringData.fromString(val.asString()); + return val -> { + if (val.isGeography()) { + return StringData.fromString( + val.asGeography().toString() + ); + } else { + return StringData.fromString(val.asString()); + } + }; + // return val -> val.isGeography() ? StringData.fromString( + // val.asGeography().toString()) + // : StringData.fromString(val.asString()); case DATE: return val -> { DateWrapper dateWrapper = val.asDate(); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java deleted file mode 100644 index 3064013..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataEdgeBatchExecutor.java +++ /dev/null @@ -1,49 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package org.apache.flink.connector.nebula.table; - -import java.sql.SQLException; -import java.util.Map; -import org.apache.flink.connector.nebula.sink.NebulaEdgeBatchExecutor; -import org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter; -import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; -import org.apache.flink.connector.nebula.utils.NebulaEdge; -import org.apache.flink.connector.nebula.utils.VidTypeEnum; -import org.apache.flink.table.data.RowData; - - -public class NebulaRowDataEdgeBatchExecutor extends NebulaEdgeBatchExecutor { - - private final NebulaRowDataConverter nebulaConverter; - - public NebulaRowDataEdgeBatchExecutor(ExecutionOptions executionOptions, - VidTypeEnum vidType, Map schema, - NebulaRowDataConverter nebulaConverter) { - super(executionOptions, vidType, schema); - this.nebulaConverter = nebulaConverter; - } - - @Override - protected void addToBatch(RowData record) { - NebulaRowEdgeOutputFormatConverter converter = - new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions, - vidType, schema); - NebulaEdge edge = null; - try { - edge = converter.createEdge( - this.nebulaConverter.toExternal(record), - executionOptions.getPolicy()); - } catch (SQLException e) { - e.printStackTrace(); - } - if (edge == null) { - return; - } - nebulaEdgeList.add(edge); - } -} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java index 737876c..c9e08d7 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataInputFormat.java @@ -18,11 +18,6 @@ /** * implementation of NebulaInputFormat. * Read NebulaGraph data in nebula's {@link BaseTableRow} format. - * how to use: - * NebulaInputTableRowFormat inputFormat = new NebulaInputTableRowFormat - * (storageConnectionProvider, vertexExecutionOptions); - * DataSource dataSource = env.createInput(inputFormat); - * */ public class NebulaRowDataInputFormat extends NebulaInputFormat { diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java deleted file mode 100644 index 674ea7f..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataOutputFormat.java +++ /dev/null @@ -1,48 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package org.apache.flink.connector.nebula.table; - -import java.util.Map; -import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; -import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -import org.apache.flink.connector.nebula.sink.NebulaBatchExecutor; -import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat; -import org.apache.flink.connector.nebula.utils.VidTypeEnum; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; - -public class NebulaRowDataOutputFormat extends NebulaBatchOutputFormat { - - private final LogicalType[] logicalTypes; - - public NebulaRowDataOutputFormat(NebulaGraphConnectionProvider graphProvider, - NebulaMetaConnectionProvider metaProvider, - LogicalType[] logicalTypes) { - super(graphProvider, metaProvider); - this.logicalTypes = logicalTypes; - } - - @Override - protected NebulaBatchExecutor getBatchExecutor(VidTypeEnum vidType, boolean isVertex) { - RowType rowType = RowType.of(logicalTypes); - NebulaRowDataConverter nebulaConverter = new NebulaRowDataConverter(rowType); - Map schema; - NebulaBatchExecutor nebulaBatchExecutor; - if (isVertex) { - schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), - executionOptions.getLabel()); - nebulaBatchExecutor = new NebulaRowDataVertexBatchExecutor( - executionOptions, vidType, schema, nebulaConverter); - } else { - schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), - executionOptions.getLabel()); - nebulaBatchExecutor = new NebulaRowDataEdgeBatchExecutor( - executionOptions, vidType, schema, nebulaConverter); - } - return nebulaBatchExecutor; - } -} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java deleted file mode 100644 index c3b5158..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataVertexBatchExecutor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package org.apache.flink.connector.nebula.table; - -import java.sql.SQLException; -import java.util.Map; -import org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter; -import org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; -import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; -import org.apache.flink.connector.nebula.utils.NebulaVertex; -import org.apache.flink.connector.nebula.utils.VidTypeEnum; -import org.apache.flink.table.data.RowData; - -public class NebulaRowDataVertexBatchExecutor extends NebulaVertexBatchExecutor { - private final NebulaRowDataConverter nebulaConverter; - - public NebulaRowDataVertexBatchExecutor(ExecutionOptions executionOptions, - VidTypeEnum vidType, - Map schema, - NebulaRowDataConverter nebulaConverter) { - super(executionOptions, vidType, schema); - this.nebulaConverter = nebulaConverter; - } - - /** - * put record into buffer - * - * @param record represent vertex or edge - */ - @Override - protected void addToBatch(RowData record) { - NebulaRowVertexOutputFormatConverter converter = new NebulaRowVertexOutputFormatConverter( - (VertexExecutionOptions) executionOptions, vidType, schema); - NebulaVertex vertex = null; - try { - vertex = converter.createVertex( - this.nebulaConverter.toExternal(record), - executionOptions.getPolicy()); - } catch (SQLException e) { - e.printStackTrace(); - } - if (vertex == null) { - return; - } - nebulaVertexList.add(vertex); - } -} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java new file mode 100644 index 0000000..c010902 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java @@ -0,0 +1,365 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.source; + +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.flink.connector.nebula.utils.NebulaConstant; +import org.apache.flink.connector.nebula.utils.NebulaEdge; +import org.apache.flink.connector.nebula.utils.NebulaEdges; +import org.apache.flink.connector.nebula.utils.NebulaVertex; +import org.apache.flink.connector.nebula.utils.NebulaVertices; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AbstractNebulaInputFormatITTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AbstractNebulaInputFormatITTest.class); + private static final String META_ADDRESS = "192.168.200.135:9559"; + private static final String GRAPH_ADDRESS = "192.168.200.135:9669"; + private static final String USERNAME = "root"; + private static final String PASSWORD = "nebula"; + + private static final String[] stats = new String[]{ + "CREATE SPACE IF NOT EXISTS `flinkSink722` (partition_num = 100, charset = utf8," + + " replica_factor = 3, collate = utf8_bin, vid_type = INT64);" + + "USE `flinkSink722`;", + "CREATE TAG IF NOT EXISTS person (col1 string, col2 fixed_string(8), col3 int8," + + " col4 int16, col5 int32," + + " col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool," + + " col11 double," + + " col12 float, col13 time, col14 geography);", + "CREATE EDGE IF NOT EXISTS friend (col1 string, col2 fixed_string(8), col3 int8," + + " col4 int16, col5 int32," + + " col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool," + + " col11 double," + + " col12 float, col13 time, col14 geography);" + }; + private final String[] colNames = {"col1", "col2", "col3", "col4", "col5", "col6", "col7", + "col8", "col9", "col10", "col11", "col12", "col13", "col14"}; + + /** + * construct flink vertex data + */ + private static List> constructVertexSourceData() { + List> players = new ArrayList<>(); + List fields1 = Arrays.asList("61", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields2 = Arrays.asList("62", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields3 = Arrays.asList("63", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields4 = Arrays.asList("64", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields5 = Arrays.asList("65", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields6 = Arrays.asList("66", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields7 = Arrays.asList("67", "\"李四\"", "\"abcdefgh\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "true", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"polygon((0 1,1 2,2 3,0 1))\")"); + List fields8 = Arrays.asList("68", "\"aba\"", "\"张三\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "true", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POLYGON((0 1,1 2,2 3,0 1))\")"); + players.add(fields1); + players.add(fields2); + players.add(fields3); + players.add(fields4); + players.add(fields5); + players.add(fields6); + players.add(fields7); + players.add(fields8); + return players; + } + + /** + * construct flink edge data + */ + private static List> constructEdgeSourceData() { + List> friends = new ArrayList<>(); + List fields1 = Arrays.asList("61", "62", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields2 = Arrays.asList("62", "63", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields3 = Arrays.asList("63", "64", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POINT(1 3)\")"); + List fields4 = Arrays.asList("64", "65", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields5 = Arrays.asList("65", "66", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields6 = Arrays.asList("66", "67", "\"aba\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "false", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"LINESTRING(1 3,2 4)\")"); + List fields7 = Arrays.asList("67", "68", "\"李四\"", "\"abcdefgh\"", "1", "1111", + "22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "true", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"polygon((0 1,1 2,2 3,0 1))\")"); + List fields8 = Arrays.asList("68", "61", "\"aba\"", "\"张三\"", "1", "1111", "22222", + "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")", + "435463424", "true", "1.2", "1.0", "time(\"11:12:12\")", + "ST_GeogFromText(\"POLYGON((0 1,1 2,2 3,0 1))\")"); + friends.add(fields1); + friends.add(fields2); + friends.add(fields3); + friends.add(fields4); + friends.add(fields5); + friends.add(fields6); + friends.add(fields7); + friends.add(fields8); + return friends; + } + + @Before + public void mockNebulaData() { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(100); + String[] addrAndPort = GRAPH_ADDRESS.split(NebulaConstant.COLON); + List addresses = Arrays.asList( + new HostAddress(addrAndPort[0], Integer.parseInt(addrAndPort[1]))); + NebulaPool pool = new NebulaPool(); + Session session = null; + try { + boolean initResult = pool.init(addresses, nebulaPoolConfig); + if (!initResult) { + LOGGER.error("pool init failed."); + assert false; + } + session = pool.getSession(USERNAME, PASSWORD, true); + createSchema(session); + insertData(session); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (session != null) { + session.release(); + } + pool.close(); + } + } + + @Test + public void testVertexSource() throws ExecutionException, InterruptedException { + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + String creatSourceDDL = "CREATE TABLE `person` (" + + " vid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'data-type' = 'vertex'," + + " 'graph-space' = 'flinkSink722'" + + ")"; + tableEnv.executeSql(creatSourceDDL); + + String creatSinkDDL = "CREATE TABLE `personSink` (" + + " vid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'print'" + + ")"; + tableEnv.executeSql(creatSinkDDL); + + Table table = tableEnv.sqlQuery("SELECT * FROM `person`"); + table.executeInsert("`personSink`").await(); + } + + @Test + public void testEdgeSource() throws ExecutionException, InterruptedException { + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + String creatSourceDDL = "CREATE TABLE `friend` (" + + " sid BIGINT," + + " did BIGINT," + + " rid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'nebula'," + + " 'meta-address' = '192.168.200.135:9559'," + + " 'graph-address' = '192.168.200.135:9669'," + + " 'username' = 'root'," + + " 'password' = 'nebula'," + + " 'graph-space' = 'flinkSink722'," + + " 'data-type'='edge'," + + " 'src-id-index'='0'," + + " 'dst-id-index'='1'," + + " 'rank-id-index'='2'" + + ")"; + tableEnv.executeSql(creatSourceDDL); + + String creatSinkDDL = "CREATE TABLE `friendSink` (" + + " sid BIGINT," + + " did BIGINT," + + " rid BIGINT," + + " col1 STRING," + + " col2 STRING," + + " col3 INT," + + " col4 INT," + + " col5 INT," + + " col6 BIGINT," + + " col7 DATE," + + " col8 TIMESTAMP," + + " col9 BIGINT," + + " col10 BOOLEAN," + + " col11 DOUBLE," + + " col12 DOUBLE," + + " col13 TIME," + + " col14 STRING" + + ") WITH (" + + " 'connector' = 'print'" + + ")"; + tableEnv.executeSql(creatSinkDDL); + + Table table = tableEnv.sqlQuery("SELECT * FROM `friend`"); + table.executeInsert("`friendSink`").await(); + } + + private void insertData(Session session) throws IOErrorException { + executeSql(getVertexInsertStat(), session); + executeSql(getEdgeInsertStat(), session); + } + + private String getVertexInsertStat() { + List> players = constructVertexSourceData(); + List vertices = new ArrayList<>(); + for (List player : players) { + vertices.add(new NebulaVertex( + player.get(0), player.subList(1, player.size()))); + } + NebulaVertices nebulaVertices = new NebulaVertices( + "person", + Arrays.asList(colNames), + vertices, + null + ); + return nebulaVertices.getInsertStatement(); + } + + private String getEdgeInsertStat() { + List> friends = constructEdgeSourceData(); + List edges = new ArrayList<>(); + for (List friend : friends) { + edges.add(new NebulaEdge( + friend.get(0), friend.get(1), 0L, friend.subList(2, friend.size()))); + } + NebulaEdges nebulaEdges = new NebulaEdges( + "friend", + Arrays.asList(colNames), + edges, + null, + null + ); + return nebulaEdges.getInsertStatement(); + } + + private void createSchema(Session session) throws IOErrorException { + for (String stat : stats) { + executeSql(stat, session); + } + } + + private void executeSql(String stat, Session session) throws IOErrorException { + ResultSet resp = session.execute(stat); + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (!resp.isSucceeded()) { + LOGGER.error("Execute {}, but failed {}", stat, resp.getErrorMessage()); + assert (false); + } + } +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java deleted file mode 100644 index 2aa93d9..0000000 --- a/connector/src/test/java/org/apache/flink/connector/nebula/table/NebulaTableTest.java +++ /dev/null @@ -1,192 +0,0 @@ -package org.apache.flink.connector.nebula.table; - - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * make sure your environment has creates space, and data has been insert into this space. - * Space schema: - * - *

"CREATE SPACE `testFlinkSource` (partition_num = 100, replica_factor = 3, charset = utf8, - * collate = utf8_bin, vid_type = INT64, atomic_edge = false)" - * - *

"USE `testFlinkSource`" - * - *

"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, - * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, - * col12 float, col13 time, col14 geography);" - * - *

"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, - * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, - * col12 float, col13 time, col14 geography);" - * - *

"CREATE SPACE `testFlinkSink` (partition_num = 100, replica_factor = 3, charset = utf8, - * collate = utf8_bin, vid_type = INT64, atomic_edge = false)" - * - *

"USE `testFlinkSink`" - * - *

"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, - * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, - * col12 float, col13 time, col14 geography);" - * - *

"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, - * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, - * col12 float, col13 time, col14 geography);" - */ -public class NebulaTableTest { - private static final Logger log = LoggerFactory.getLogger(NebulaTableTest.class); - - @Test - public void testVertexTransfer() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - EnvironmentSettings settings = EnvironmentSettings.newInstance() - .inStreamingMode() - .useBlinkPlanner() - .build(); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); - - String creatSourceDDL = "CREATE TABLE `VERTEX.personSource` (" - + " vid BIGINT," - + " col1 STRING," - + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," - + " col6 BIGINT," - + " col7 DATE," - + " col8 TIMESTAMP," - + " col9 BIGINT," - + " col10 BOOLEAN," - + " col11 DOUBLE," - + " col12 DOUBLE," - + " col13 TIME," - + " col14 STRING" - + ") WITH (" - + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," - + " 'username' = 'root'," - + " 'password' = 'nebula'," - + " 'graph-space' = 'testFlinkSource'," - + " 'label-name' = 'person'" - + ")"; - tableEnv.executeSql(creatSourceDDL); - - String creatSinkDDL = "CREATE TABLE `VERTEX.personSink` (" - + " vid BIGINT," - + " col1 STRING," - + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," - + " col6 BIGINT," - + " col7 DATE," - + " col8 TIMESTAMP," - + " col9 BIGINT," - + " col10 BOOLEAN," - + " col11 DOUBLE," - + " col12 DOUBLE," - + " col13 TIME," - + " col14 STRING" - + ") WITH (" - + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," - + " 'username' = 'root'," - + " 'password' = 'nebula'," - + " 'graph-space' = 'testFlinkSink'," - + " 'label-name' = 'person'" - + ")"; - tableEnv.executeSql(creatSinkDDL); - - Table table = tableEnv.sqlQuery("SELECT * FROM `VERTEX.personSource`"); - table.executeInsert("`VERTEX.personSink`"); - } - - @Test - public void testEdgeTransfer() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - EnvironmentSettings settings = EnvironmentSettings.newInstance() - .inStreamingMode() - .useBlinkPlanner() - .build(); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); - - String creatSourceDDL = "CREATE TABLE `EDGE.friendSource` (" - + " sid BIGINT," - + " eid BIGINT," - + " rid BIGINT," - + " col1 STRING," - + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," - + " col6 BIGINT," - + " col7 DATE," - + " col8 TIMESTAMP," - + " col9 BIGINT," - + " col10 BOOLEAN," - + " col11 DOUBLE," - + " col12 DOUBLE," - + " col13 TIME," - + " col14 STRING" - + ") WITH (" - + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," - + " 'username' = 'root'," - + " 'password' = 'nebula'," - + " 'graph-space' = 'testFlinkSource'," - + " 'label-name' = 'friend'," - + " 'src-index' = '0'," - + " 'dst-index' = '1'," - + " 'rank-index' = '2'" - + ")"; - tableEnv.executeSql(creatSourceDDL); - - String creatSinkDDL = "CREATE TABLE `EDGE.friendSink` (" - + " sid BIGINT," - + " eid BIGINT," - + " rid BIGINT," - + " col1 STRING," - + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," - + " col6 BIGINT," - + " col7 DATE," - + " col8 TIMESTAMP," - + " col9 BIGINT," - + " col10 BOOLEAN," - + " col11 DOUBLE," - + " col12 DOUBLE," - + " col13 TIME," - + " col14 STRING" - + ") WITH (" - + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," - + " 'username' = 'root'," - + " 'password' = 'nebula'," - + " 'graph-space' = 'testFlinkSink'," - + " 'label-name' = 'friend'," - + " 'src-index' = '0'," - + " 'dst-index' = '1'," - + " 'rank-index' = '2'" - + ")"; - tableEnv.executeSql(creatSinkDDL); - - Table table = tableEnv.sqlQuery("SELECT * FROM `EDGE.friendSource`"); - table.executeInsert("`EDGE.friendSink`"); - } -} From 6dbf56b96c30c7bafa33355b4134acd85b024be8 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Sat, 23 Jul 2022 03:59:39 +0800 Subject: [PATCH 5/8] fix assertion error --- .../AbstractNebulaInputFormatITTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java index c010902..ef7adf7 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java @@ -34,15 +34,15 @@ public class AbstractNebulaInputFormatITTest { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNebulaInputFormatITTest.class); - private static final String META_ADDRESS = "192.168.200.135:9559"; - private static final String GRAPH_ADDRESS = "192.168.200.135:9669"; + private static final String META_ADDRESS = "127.0.0.1:9559"; + private static final String GRAPH_ADDRESS = "127.0.0.1:9669"; private static final String USERNAME = "root"; private static final String PASSWORD = "nebula"; private static final String[] stats = new String[]{ - "CREATE SPACE IF NOT EXISTS `flinkSink722` (partition_num = 100, charset = utf8," + "CREATE SPACE IF NOT EXISTS `flinkSink` (partition_num = 100, charset = utf8," + " replica_factor = 3, collate = utf8_bin, vid_type = INT64);" - + "USE `flinkSink722`;", + + "USE `flinkSink`;", "CREATE TAG IF NOT EXISTS person (col1 string, col2 fixed_string(8), col3 int8," + " col4 int16, col5 int32," + " col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool," @@ -166,7 +166,7 @@ public void mockNebulaData() { boolean initResult = pool.init(addresses, nebulaPoolConfig); if (!initResult) { LOGGER.error("pool init failed."); - assert false; + assert (false); } session = pool.getSession(USERNAME, PASSWORD, true); createSchema(session); @@ -206,12 +206,12 @@ public void testVertexSource() throws ExecutionException, InterruptedException { + " col14 STRING" + ") WITH (" + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," + + " 'meta-address' = '127.0.0.1:9559'," + + " 'graph-address' = '127.0.0.1:9669'," + " 'username' = 'root'," + " 'password' = 'nebula'," + " 'data-type' = 'vertex'," - + " 'graph-space' = 'flinkSink722'" + + " 'graph-space' = 'flinkSink'" + ")"; tableEnv.executeSql(creatSourceDDL); @@ -267,11 +267,11 @@ public void testEdgeSource() throws ExecutionException, InterruptedException { + " col14 STRING" + ") WITH (" + " 'connector' = 'nebula'," - + " 'meta-address' = '192.168.200.135:9559'," - + " 'graph-address' = '192.168.200.135:9669'," + + " 'meta-address' = '127.0.0.1:9559'," + + " 'graph-address' = '127.0.0.1:9669'," + " 'username' = 'root'," + " 'password' = 'nebula'," - + " 'graph-space' = 'flinkSink722'," + + " 'graph-space' = 'flinkSink'," + " 'data-type'='edge'," + " 'src-id-index'='0'," + " 'dst-id-index'='1'," From 58d7e87b0acc6fbc7c289505fee1e96d889382ea Mon Sep 17 00:00:00 2001 From: liuxiao Date: Thu, 4 Aug 2022 01:56:53 +0800 Subject: [PATCH 6/8] fix by review suggestion --- .../nebula/table/NebulaRowDataConverter.java | 18 ++++-------------- .../connector/nebula/utils/WriteModeEnum.java | 15 --------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java index 68f9715..9332a98 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaRowDataConverter.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2022 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ @@ -119,7 +119,6 @@ private NebulaDeserializationConverter createInternalConverter(LogicalType type) case TINYINT: case SMALLINT: case INTEGER: - return val -> (int) val.asLong(); case BIGINT: return ValueWrapper::asLong; case FLOAT: @@ -127,18 +126,9 @@ private NebulaDeserializationConverter createInternalConverter(LogicalType type) return ValueWrapper::asDouble; case CHAR: case VARCHAR: - return val -> { - if (val.isGeography()) { - return StringData.fromString( - val.asGeography().toString() - ); - } else { - return StringData.fromString(val.asString()); - } - }; - // return val -> val.isGeography() ? StringData.fromString( - // val.asGeography().toString()) - // : StringData.fromString(val.asString()); + return val -> val.isGeography() ? StringData.fromString( + val.asGeography().toString()) + : StringData.fromString(val.asString()); case DATE: return val -> { DateWrapper dateWrapper = val.asDate(); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java index 17ad34e..9b7b7a7 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java @@ -26,19 +26,4 @@ public enum WriteModeEnum { WriteModeEnum(String mode) { this.mode = mode; } - - public static boolean checkValidWriteMode(String modeName) { - return chooseWriteMode(modeName) != INSERT - || INSERT.name().equalsIgnoreCase(modeName); - } - - public static WriteModeEnum chooseWriteMode(String modeName) { - if (UPDATE.name().equalsIgnoreCase(modeName)) { - return UPDATE; - } - if (DELETE.name().equalsIgnoreCase(modeName)) { - return DELETE; - } - return INSERT; - } } From e4d97a5a08be8823ea9f126379edcfbb54466cd0 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Thu, 4 Aug 2022 10:10:29 +0800 Subject: [PATCH 7/8] fix bigint convert --- .../source/AbstractNebulaInputFormatITTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java index ef7adf7..025b768 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java @@ -192,9 +192,9 @@ public void testVertexSource() throws ExecutionException, InterruptedException { + " vid BIGINT," + " col1 STRING," + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP," @@ -219,9 +219,9 @@ public void testVertexSource() throws ExecutionException, InterruptedException { + " vid BIGINT," + " col1 STRING," + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP," From 9158322d474889ced597d771d1176b82fb274b20 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Thu, 4 Aug 2022 10:16:50 +0800 Subject: [PATCH 8/8] fix edge bigint convert --- .../source/AbstractNebulaInputFormatITTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java index 025b768..48ad693 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/source/AbstractNebulaInputFormatITTest.java @@ -253,9 +253,9 @@ public void testEdgeSource() throws ExecutionException, InterruptedException { + " rid BIGINT," + " col1 STRING," + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP," @@ -285,9 +285,9 @@ public void testEdgeSource() throws ExecutionException, InterruptedException { + " rid BIGINT," + " col1 STRING," + " col2 STRING," - + " col3 INT," - + " col4 INT," - + " col5 INT," + + " col3 BIGINT," + + " col4 BIGINT," + + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP,"