Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Socket Connector V2] Add Socket Connector Option Rules #3317

Merged
merged 14 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Socket.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Used to send data to Socket Server. Both support streaming and batch mode.

| name | type | required | default value |
| -------------- |--------|----------|---------------|
| host | String | Yes | - |
| port | Integer| yes | - |
| host | String | Yes | |
| port | Integer| yes | |
| max_retries | Integer| No | 3 |
| common-options | | no | - |

Expand Down
10 changes: 7 additions & 3 deletions docs/en/connector-v2/source/Socket.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Used to read data from Socket.
## Options

| name | type | required | default value |
| -------------- |--------| -------- | ------------- |
| host | String | No | localhost |
| port | Integer| No | 9999 |
| -------------- |--------|----------|---------------|
| host | String | Yes | |
| port | Integer| Yes | |
| common-options | | no | - |

### host [string]
Expand Down Expand Up @@ -103,3 +103,7 @@ spark
### 2.2.0-beta 2022-09-26

- Add Socket Source Connector

### Next Version

- `host` and `port` become required ([3317](https://github.com/apache/incubator-seatunnel/pull/3317))
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.socket.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class SocketSinkConfigOptions {
private static final int DEFAULT_MAX_RETRIES = 3;

public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("socket host");

public static final Option<Integer> PORT =
Options.key("port")
.intType()
.noDefaultValue()
.withDescription("socket port");

public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
.intType()
.defaultValue(DEFAULT_MAX_RETRIES)
.withDescription("default value is " + DEFAULT_MAX_RETRIES + ", max retries");
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.socket.config;
package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -25,19 +29,15 @@

@Data
public class SinkConfig implements Serializable {
public static final String HOST = "host";
public static final String PORT = "port";
private static final String MAX_RETRIES = "max_retries";
private static final int DEFAULT_MAX_RETRIES = 3;
private String host;
private int port;
private int maxNumRetries = DEFAULT_MAX_RETRIES;
private int maxNumRetries;

public SinkConfig(Config config) {
this.host = config.getString(HOST);
this.port = config.getInt(PORT);
if (config.hasPath(MAX_RETRIES)) {
this.maxNumRetries = config.getInt(MAX_RETRIES);
this.host = config.getString(HOST.key());
this.port = config.getInt(PORT.key());
if (config.hasPath(MAX_RETRIES.key())) {
this.maxNumRetries = config.getInt(MAX_RETRIES.key());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -66,7 +65,7 @@ public void open() throws IOException {
}
}

public void wirte(SeaTunnelRow row) throws IOException {
public void write(SeaTunnelRow row) throws IOException {
byte[] msg = serializationSchema.serialize(row);
try {
outputStream.write(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
Expand All @@ -28,7 +31,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -50,7 +52,7 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SinkConfig.PORT, SinkConfig.HOST);
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, PORT.key(), HOST.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SocketSinkFactory implements TableSinkFactory {
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public String factoryIdentifier() {
return "Socket";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(MAX_RETRIES).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import java.io.IOException;
Expand All @@ -40,7 +39,7 @@ public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

@Override
public void write(SeaTunnelRow element) throws IOException {
socketClient.wirte(element);
socketClient.write(element);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.socket.source;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.Boundedness;
Expand All @@ -25,13 +28,15 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;

import com.google.auto.service.AutoService;

Expand All @@ -52,7 +57,11 @@ public String getPluginName() {

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.parameter = ConfigBeanFactory.create(pluginConfig, SocketSourceParameter.class);
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, PORT.key(), HOST.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
this.parameter = new SocketSourceParameter(pluginConfig);
}

@Override
Expand All @@ -66,7 +75,8 @@ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext)
throws Exception {
return new SocketSourceReader(this.parameter, readerContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.socket.source;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SocketSourceFactory implements TableSourceFactory {
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public String factoryIdentifier() {
return "Socket";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,39 @@

package org.apache.seatunnel.connectors.seatunnel.socket.source;

import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.Objects;

public class SocketSourceParameter implements Serializable {

private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 9999;
private String host;
private Integer port;

public String getHost() {
return StringUtils.isBlank(host) ? DEFAULT_HOST : host;
}

public void setHost(String host) {
this.host = host;
return StringUtils.isBlank(host) ? HOST.defaultValue() : host;
}

public Integer getPort() {
return Objects.isNull(port) ? DEFAULT_PORT : port;
return Objects.isNull(port) ? PORT.defaultValue() : port;
}

public void setPort(Integer port) {
this.port = port;
public SocketSourceParameter(Config config) {
if (config.hasPath(HOST.key())) {
this.host = config.getString(HOST.key());
} else {
this.host = HOST.defaultValue();
}

if (config.hasPath(PORT.key())) {
this.port = config.getInt(PORT.key());
} else {
this.port = PORT.defaultValue();
}
}
}