diff --git a/docs/en/connector-v2/source/Web3j.md b/docs/en/connector-v2/source/Web3j.md
new file mode 100644
index 00000000000..6e50789b419
--- /dev/null
+++ b/docs/en/connector-v2/source/Web3j.md
@@ -0,0 +1,61 @@
+# Web3j
+
+> Web3j source connector
+
+## Support Those Engines
+
+> Spark
+> Flink
+> Seatunnel Zeta
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Source connector for web3j. It is used to read data from the blockchain, such as block information, transactions, smart contract events, etc. Currently, it supports reading block height data.
+
+## Source Options
+
+| Name | Type | Required | Default | Description |
+|------|--------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | When using Infura as the service provider, the URL is used for communication with the Ethereum network. |
+
+## How to Create a Http Data Synchronization Jobs
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Web3j {
+ url = "https://mainnet.infura.io/v3/xxxxx"
+ }
+}
+
+# Console printing of the read Http data
+sink {
+ Console {
+ parallelism = 1
+ }
+}
+```
+
+Then you will get the following data:
+
+```json
+{"blockNumber":19525949,"timestamp":"2024-03-27T13:28:45.605Z"}
+```
+
+## Changelog
+
+- Add Web3j Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 3ea8bfc7f7c..c880a8fdf22 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,6 +105,7 @@ seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.sink.S3Redshift = connector-s3-redshift
+seatunnel.source.Web3j = connector-web3j
seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
diff --git a/release-note.md b/release-note.md
index 9b41cc96d63..b799df78f74 100644
--- a/release-note.md
+++ b/release-note.md
@@ -18,6 +18,7 @@
- [Hbase] Add hbase sink connector #4049
- [Clickhouse] Fix clickhouse old version compatibility #5326
- [Easysearch] Support INFINI Easysearch #5933
+- [Web3j] add Web3j source connector #6598
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981
diff --git a/seatunnel-connectors-v2/connector-web3j/pom.xml b/seatunnel-connectors-v2/connector-web3j/pom.xml
new file mode 100644
index 00000000000..5226f51c2ab
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+
+ connector-web3j
+ SeaTunnel : Connectors V2 : Web3j
+
+
+ 4.8.4
+
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+
+ org.apache.seatunnel
+ seatunnel-format-json
+ ${project.version}
+
+
+
+ org.web3j
+ core
+ ${web3j.version}
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
new file mode 100644
index 00000000000..0147d5bf0cd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class Web3jConfig {
+
+ public static final Option URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "your infura project url like : https://mainnet.infura.io/v3/xxxxxxxxxxxx");
+}
diff --git a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
new file mode 100644
index 00000000000..da18409386d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class Web3jSource extends AbstractSingleSplitSource {
+ private Web3jSourceParameter parameter;
+ private JobContext jobContext;
+
+ public Web3jSource(ReadonlyConfig readonlyConfig) {
+ this.parameter = new Web3jSourceParameter(readonlyConfig);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(jobContext.getJobMode())
+ ? Boundedness.BOUNDED
+ : Boundedness.UNBOUNDED;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Web3j";
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public List getProducedCatalogTables() {
+ return Collections.singletonList(
+ CatalogTable.of(
+ TableIdentifier.of("Web3j", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "value", BasicType.STRING_TYPE, 0L, true, null, ""))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ ""));
+ }
+
+ @Override
+ public AbstractSingleSplitReader createReader(
+ SingleSplitReaderContext readerContext) throws Exception {
+ return new Web3jSourceReader(this.parameter, readerContext);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
new file mode 100644
index 00000000000..89c53fc89dc
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+@AutoService(Factory.class)
+public class Web3jSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Web3j";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(URL).build();
+ }
+
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return Web3jSource.class;
+ }
+
+ @Override
+ public
+ TableSource createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource) new Web3jSource(context.getOptions());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
new file mode 100644
index 00000000000..77aefd6dac1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+public class Web3jSourceParameter implements Serializable {
+ private final String url;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public Web3jSourceParameter(ReadonlyConfig config) {
+ this.url = config.get(URL);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
new file mode 100644
index 00000000000..52b9507f912
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.web3j.protocol.Web3j;
+import org.web3j.protocol.http.HttpService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class Web3jSourceReader extends AbstractSingleSplitReader {
+ private final Web3jSourceParameter parameter;
+ private final SingleSplitReaderContext context;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Web3j web3;
+
+ Web3jSourceReader(Web3jSourceParameter parameter, SingleSplitReaderContext context) {
+ this.parameter = parameter;
+ this.context = context;
+ }
+
+ @Override
+ public void open() throws Exception {
+ web3 = Web3j.build(new HttpService(this.parameter.getUrl()));
+ log.info("connect Web3j server, url:[{}] ", this.parameter.getUrl());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (web3 != null) {
+ web3.shutdown();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector output) throws Exception {
+ web3.ethBlockNumber()
+ .flowable()
+ .subscribe(
+ blockNumber -> {
+ Map data = new HashMap<>();
+ data.put("timestamp", Instant.now().toString());
+ data.put("blockNumber", blockNumber.getBlockNumber());
+
+ String json = OBJECT_MAPPER.writeValueAsString(data);
+
+ output.collect(new SeaTunnelRow(new Object[] {json}));
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ context.signalNoMoreElement();
+ }
+ });
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index f69b5920879..d1e5af9ee68 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -75,6 +75,7 @@
connector-amazonsqs
connector-paimon
connector-easysearch
+ connector-web3j
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 6447ff43acb..41cb15cd63b 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -311,6 +311,12 @@
${project.version}
provided
+
+ org.apache.seatunnel
+ connector-web3j
+ ${project.version}
+ provided
+
org.apache.seatunnel
connector-kudu
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
new file mode 100644
index 00000000000..913e2e4a42d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+
+ connector-web3j-e2e
+ SeaTunnel : E2E : Connector V2 : Web3j
+
+
+
+
+ org.apache.seatunnel
+ connector-web3j
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-assert
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
new file mode 100644
index 00000000000..040f0f23c40
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.google.firestore;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+@Disabled("Disabled because it needs your infura project url to run this test")
+public class Web3jIT extends TestSuiteBase implements TestResource {
+
+ private static final String FIRESTORE_CONF_FILE = "/firestore/web3j_to_assert.conf";
+
+ @TestTemplate
+ public void testWeb3j(TestContainer container) throws Exception {
+ Container.ExecResult execResult = container.executeJob(FIRESTORE_CONF_FILE);
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @Override
+ public void startUp() throws Exception {}
+
+ @Override
+ public void tearDown() throws Exception {}
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
new file mode 100644
index 00000000000..95d2a2b17a3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Web3j {
+ url = "https://mainnet.infura.io/v3/xxxxxxx"
+ result_table_name = "web3j"
+ }
+}
+
+sink {
+ # This is a example sink plugin **only for test and demonstrate the feature sink plugin**
+ Console {
+ source_table_name = "web3j"
+
+ }
+ Assert {
+ source_table_name = "web3j"
+ rules {
+ field_rules = [
+ {
+ field_name = value
+ field_type = String
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 45c78dbf708..477b0620d28 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -57,6 +57,7 @@
connector-datahub-e2e
connector-mongodb-e2e
connector-hbase-e2e
+ connector-web3j-e2e
connector-maxcompute-e2e
connector-google-firestore-e2e
connector-rocketmq-e2e