diff --git a/docs/en/connector-v2/sink/Socket.md b/docs/en/connector-v2/sink/Socket.md index 46f7aa51ed9..2a29a17c416 100644 --- a/docs/en/connector-v2/sink/Socket.md +++ b/docs/en/connector-v2/sink/Socket.md @@ -16,8 +16,8 @@ Used to send data to Socket Server. Both support streaming and batch mode. | name | type | required | default value | | -------------- |--------|----------|---------------| -| host | String | Yes | - | -| port | Integer| yes | - | +| host | String | Yes | | +| port | Integer| yes | | | max_retries | Integer| No | 3 | | common-options | | no | - | diff --git a/docs/en/connector-v2/source/Socket.md b/docs/en/connector-v2/source/Socket.md index 5521a3e3212..55c3c200f34 100644 --- a/docs/en/connector-v2/source/Socket.md +++ b/docs/en/connector-v2/source/Socket.md @@ -18,9 +18,9 @@ Used to read data from Socket. ## Options | name | type | required | default value | -| -------------- |--------| -------- | ------------- | -| host | String | No | localhost | -| port | Integer| No | 9999 | +| -------------- |--------|----------|---------------| +| host | String | Yes | | +| port | Integer| Yes | | | common-options | | no | - | ### host [string] @@ -103,3 +103,7 @@ spark ### 2.2.0-beta 2022-09-26 - Add Socket Source Connector + +### Next Version + +- `host` and `port` become required ([3317](https://github.com/apache/incubator-seatunnel/pull/3317)) diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java new file mode 100644 index 00000000000..5313e7abbd0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class SocketSinkConfigOptions { + private static final int DEFAULT_MAX_RETRIES = 3; + + public static final Option HOST = + Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("socket host"); + + public static final Option PORT = + Options.key("port") + .intType() + .noDefaultValue() + .withDescription("socket port"); + + public static final Option MAX_RETRIES = + Options.key("max_retries") + .intType() + .defaultValue(DEFAULT_MAX_RETRIES) + .withDescription("default value is " + DEFAULT_MAX_RETRIES + ", max retries"); +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java similarity index 63% rename from seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java rename to seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java index 9e93336b73b..4ebd152103c 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java @@ -15,7 +15,11 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.socket.config; +package org.apache.seatunnel.connectors.seatunnel.socket.sink; + +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -25,19 +29,15 @@ @Data public class SinkConfig implements Serializable { - public static final String HOST = "host"; - public static final String PORT = "port"; - private static final String MAX_RETRIES = "max_retries"; - private static final int DEFAULT_MAX_RETRIES = 3; private String host; private int port; - private int maxNumRetries = DEFAULT_MAX_RETRIES; + private int maxNumRetries; public SinkConfig(Config config) { - this.host = config.getString(HOST); - this.port = config.getInt(PORT); - if (config.hasPath(MAX_RETRIES)) { - this.maxNumRetries = config.getInt(MAX_RETRIES); + this.host = config.getString(HOST.key()); + this.port = config.getInt(PORT.key()); + if (config.hasPath(MAX_RETRIES.key())) { + this.maxNumRetries = config.getInt(MAX_RETRIES.key()); } } } diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java index 77a38b1c868..3e83044b38f 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.serialization.SerializationSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; import lombok.extern.slf4j.Slf4j; @@ -66,7 +65,7 @@ public void open() throws IOException { } } - public void wirte(SeaTunnelRow row) throws IOException { + public void write(SeaTunnelRow row) throws IOException { byte[] msg = serializationSchema.serialize(row); try { outputStream.write(msg); diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java index 3277ef51891..1bc88fa5aa5 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.socket.sink; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; @@ -28,7 +31,6 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -50,7 +52,7 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { this.pluginConfig = pluginConfig; - CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SinkConfig.PORT, SinkConfig.HOST); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, PORT.key(), HOST.key()); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); } diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java new file mode 100644 index 00000000000..a57a18fef5f --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.sink; + +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SocketSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Socket"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(HOST, PORT).optional(MAX_RETRIES).build(); + } +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java index 9241c9e57d9..577835927ea 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; import org.apache.seatunnel.format.json.JsonSerializationSchema; import java.io.IOException; @@ -40,7 +39,7 @@ public class SocketSinkWriter extends AbstractSinkWriter { @Override public void write(SeaTunnelRow element) throws IOException { - socketClient.wirte(element); + socketClient.write(element); } @Override diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java index f97e00e93dd..82fab08bf86 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; + import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; @@ -25,13 +28,15 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory; import com.google.auto.service.AutoService; @@ -52,7 +57,11 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - this.parameter = ConfigBeanFactory.create(pluginConfig, SocketSourceParameter.class); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, PORT.key(), HOST.key()); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + this.parameter = new SocketSourceParameter(pluginConfig); } @Override @@ -66,7 +75,8 @@ public SeaTunnelDataType getProducedType() { } @Override - public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) + throws Exception { return new SocketSourceReader(this.parameter, readerContext); } } diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java new file mode 100644 index 00000000000..a9a1fc2f96c --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.source; + +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SocketSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "Socket"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(HOST, PORT).build(); + } +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java index dbbc7eca951..09d0017872b 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java @@ -17,31 +17,39 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.commons.lang3.StringUtils; import java.io.Serializable; import java.util.Objects; public class SocketSourceParameter implements Serializable { - - private static final String DEFAULT_HOST = "localhost"; - private static final int DEFAULT_PORT = 9999; private String host; private Integer port; public String getHost() { - return StringUtils.isBlank(host) ? DEFAULT_HOST : host; - } - - public void setHost(String host) { - this.host = host; + return StringUtils.isBlank(host) ? HOST.defaultValue() : host; } public Integer getPort() { - return Objects.isNull(port) ? DEFAULT_PORT : port; + return Objects.isNull(port) ? PORT.defaultValue() : port; } - public void setPort(Integer port) { - this.port = port; + public SocketSourceParameter(Config config) { + if (config.hasPath(HOST.key())) { + this.host = config.getString(HOST.key()); + } else { + this.host = HOST.defaultValue(); + } + + if (config.hasPath(PORT.key())) { + this.port = config.getInt(PORT.key()); + } else { + this.port = PORT.defaultValue(); + } } }