Skip to content

Commit

Permalink
[BUG] [DOC] Fix missing areas in the development documentation #7959
Browse files Browse the repository at this point in the history
Supplementary explanations have been provided for the methods and classes that must be implemented during the source development process
  • Loading branch information
YOMO-Lee committed Nov 2, 2024
1 parent 74791dc commit 45fb3e6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
16 changes: 10 additions & 6 deletions seatunnel-connectors-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ own connectors, you need to follow the steps below.

3.Create two packages corresponding to source and sink

​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source

​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink

4.add connector info to plugin-mapping.properties file in seatunnel root path.

5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can be find in binary package.

6.There are several classes that must be implemented on the source side, namely {connector name} Source, {connector name} SourceFactor, {connector name} SourceReader. Please refer to other connectors for details

7.{Connector Name} SourceFactory needs to be annotated with the **@AutoService (Factory.class)** annotation on the class name, and in addition to the required methods, an additional 'creatSource' method needs to be rewritten

8.{Connector Name} Source needs to override the getProducedCatalogTables method

### **Startup Class**

Aside from the old startup class, we have created two new startup modules,
Expand Down Expand Up @@ -134,7 +139,7 @@ completed by implementing this interface.
these 100 pieces of data for batch processing. Stream processing does not have this requirement, so most SourceReaders
with integrated stream batches will have the following code:

```java
```
if(Boundedness.BOUNDED.equals(context.getBoundedness())){
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
Expand Down Expand Up @@ -208,8 +213,7 @@ It is recommended to put it in the same directory as the implementation class of
- `factoryIdentifier` is used to indicate the name of the current Factory. This value should be the same as the
value returned by `getPluginName`, so that if Factory is used to create Source/Sink in the future,
A seamless switch can be achieved.
- `createSink` and `createSource` are the methods for creating Source and Sink respectively,
and do not need to be implemented at present.
- `createSink` and `createSource` are the methods for creating Source and Sink respectively.
- `optionRule` returns the parameter logic, which is used to indicate which parameters of our connector are supported,
which parameters are required, which parameters are optional, and which parameters are exclusive, which parameters are bundledRequired.
This method will be used when we visually create the connector logic, and it will also be used to generate a complete parameter
Expand Down
26 changes: 15 additions & 11 deletions seatunnel-connectors-v2/README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
### **工程结构**

- ../`seatunnel-connectors-v2` connector-v2代码实现
- ../`seatunnel-translation` connector-v2的翻译层
- ../`seatunnel-translation` connector-v2的翻译层
- ../`seatunnel-transform-v2` transform-v2代码实现
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e` connector-v2端到端测试
- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2的flink local运行的实例
Expand Down Expand Up @@ -39,14 +39,19 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过

3.新建两个package分别对应source和sink

​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.source

​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.sink
package org.apache.seatunnel.connectors.seatunnel.{连接器名}.source
package org.apache.seatunnel.connectors.seatunnel.{连接器名}.sink

4.将连接器信息添加到在项目根目录的plugin-mapping.properties文件中.

5.将连接器添加到seatunnel-dist/pom.xml,这样连接器jar就可以在二进制包中找到.

6.source端有几个必须实现的类,分别是{连接器名}Source、{连接器名}SourceFactor、{连接器名}SourceReader,具体可以参考其他连接器

7.{连接器名}SourceFactory 里面需要在类名上标注 **@AutoService(Factory.class)** 注解,并且除了必须实现的方法外,需要额外再重写一个 createSource 方法

8.{连接器名}Source 需要重写 getProducedCatalogTables 方法

### 启动类

和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter``seatunnel-core/seatunnel-spark-starter`.
Expand Down Expand Up @@ -103,7 +108,7 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
中调用`SourceReader.Context.signalNoMoreElement`
通知SeaTunnel没有数据读取了,那么就可以利用这100条数据进行批处理。流处理没有这个要求,那么大多数流批一体的SourceReader都会出现如下代码:

```java
```
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
Expand Down Expand Up @@ -154,13 +159,12 @@ Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`
为了实现自动化的创建Source或者Sink,我们需要连接器能够声明并返回创建他们所需要的参数列表和每个参数的校验规则。为了实现这个目标,我们定义了TableSourceFactory和TableSinkFactory,
建议将其放在和SeaTunnelSource或SeaTunnelSink实现类同一目录下,方便寻找。

- `factoryIdentifier` 用于表明当前Factory的名称,这个值应该和`getPluginName`返回的值一致,这样后续如果使用Factory来创建Source/Sink,
就能实现无缝切换。
- `createSink``createSource` 分别是创建Source和Sink的方法,目前不用实现。
- `factoryIdentifier` 用于表明当前Factory的名称,这个值应该和`getPluginName`返回的值一致,这样后续如果使用Factory来创建Source/Sink,就能实现无缝切换。
- `createSink``createSource` 分别是创建Source和Sink的方法。
- `optionRule` 返回的是参数逻辑,用于表示我们的连接器参数哪些支持,哪些参数是必须(required)的,哪些参数是可选(optional)的,哪些参数是互斥(exclusive)的,哪些参数是绑定(bundledRequired)的。
这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`
这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`

别忘记添加`@AutoService(Factory.class)` 到类上面。这个Factory即TableSourceFactory 和 TableSinkFactory的父类。

Expand Down

0 comments on commit 45fb3e6

Please sign in to comment.