Skip to content

Commit

Permalink
[Feature][Connector-V2] Add web3j source connector (#6598)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Jia Fan <[email protected]>
  • Loading branch information
ic4y and Hisoka-X authored May 11, 2024
1 parent fc39390 commit b7002bf
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 0 deletions.
61 changes: 61 additions & 0 deletions docs/en/connector-v2/source/Web3j.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Web3j

> Web3j source connector
## Support Those Engines

> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>
## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Description

Source connector for web3j. It is used to read data from the blockchain, such as block information, transactions, smart contract events, etc. Currently, it supports reading block height data.

## Source Options

| Name | Type | Required | Default | Description |
|------|--------|----------|---------|---------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | When using Infura as the service provider, the URL is used for communication with the Ethereum network. |

## How to Create a Http Data Synchronization Jobs

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Web3j {
url = "https://mainnet.infura.io/v3/xxxxx"
}
}
# Console printing of the read Http data
sink {
Console {
parallelism = 1
}
}
```

Then you will get the following data:

```json
{"blockNumber":19525949,"timestamp":"2024-03-27T13:28:45.605Z"}
```

## Changelog

- Add Web3j Source Connector

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.sink.S3Redshift = connector-s3-redshift
seatunnel.source.Web3j = connector-web3j
seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [Hbase] Add hbase sink connector #4049
- [Clickhouse] Fix clickhouse old version compatibility #5326
- [Easysearch] Support INFINI Easysearch #5933
- [Web3j] add Web3j source connector #6598
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981
Expand Down
54 changes: 54 additions & 0 deletions seatunnel-connectors-v2/connector-web3j/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-web3j</artifactId>
<name>SeaTunnel : Connectors V2 : Web3j</name>

<properties>
<web3j.version>4.8.4</web3j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.web3j</groupId>
<artifactId>core</artifactId>
<version>${web3j.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class Web3jConfig {

public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"your infura project url like : https://mainnet.infura.io/v3/xxxxxxxxxxxx");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

public class Web3jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private Web3jSourceParameter parameter;
private JobContext jobContext;

public Web3jSource(ReadonlyConfig readonlyConfig) {
this.parameter = new Web3jSourceParameter(readonlyConfig);
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
? Boundedness.BOUNDED
: Boundedness.UNBOUNDED;
}

@Override
public String getPluginName() {
return "Web3j";
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(
CatalogTable.of(
TableIdentifier.of("Web3j", TablePath.DEFAULT),
TableSchema.builder()
.column(
PhysicalColumn.of(
"value", BasicType.STRING_TYPE, 0L, true, null, ""))
.build(),
new HashMap<>(),
new ArrayList<>(),
""));
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
return new Web3jSourceReader(this.parameter, readerContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;

import com.google.auto.service.AutoService;

import java.io.Serializable;

import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;

@AutoService(Factory.class)
public class Web3jSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return "Web3j";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(URL).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return Web3jSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> (SeaTunnelSource<T, SplitT, StateT>) new Web3jSource(context.getOptions());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import java.io.Serializable;

import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;

public class Web3jSourceParameter implements Serializable {
private final String url;

public String getUrl() {
return url;
}

public Web3jSourceParameter(ReadonlyConfig config) {
this.url = config.get(URL);
}
}
Loading

0 comments on commit b7002bf

Please sign in to comment.