From e6c51a95c77542b051af4ac2caaf8cff67213666 Mon Sep 17 00:00:00 2001 From: lizhenglei <127465317+jackyyyyyssss@users.noreply.github.com> Date: Sun, 18 Feb 2024 11:15:24 +0800 Subject: [PATCH] [Improve][Connector-V2]Support multi-table sink feature for httpsink (#6316) --- .../seatunnel/http/sink/HttpSink.java | 4 +- .../seatunnel/http/sink/HttpSinkWriter.java | 4 +- .../seatunnel/e2e/connector/http/HttpIT.java | 16 ++++ .../test/resources/fake_to_multitable.conf | 88 +++++++++++++++++++ .../src/test/resources/mockserver-config.json | 24 +++++ 5 files changed, 134 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java index 1cf22b01645..da1cb0a8dad 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; @@ -36,7 +37,8 @@ import java.util.Map; import java.util.stream.Collectors; -public class HttpSink extends AbstractSimpleSink { +public class HttpSink extends AbstractSimpleSink + implements SupportMultiTableSink { protected final HttpParameter httpParameter = new HttpParameter(); protected SeaTunnelRowType seaTunnelRowType; protected Config pluginConfig; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java index dc1790733d0..0333b8f37a1 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.http.sink; import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -32,7 +33,8 @@ import java.util.Objects; @Slf4j -public class HttpSinkWriter extends AbstractSinkWriter { +public class HttpSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { protected final HttpClientProvider httpClient; protected final SeaTunnelRowType seaTunnelRowType; protected final HttpParameter httpParameter; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java index 405bc5157f8..9dc38cbd1ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java @@ -19,7 +19,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -44,6 +46,8 @@ public class HttpIT extends TestSuiteBase implements TestResource { private static final String TMP_DIR = "/tmp"; + private static final String successCount = "Total Write Count : 2"; + private static final String IMAGE = "mockserver/mockserver:5.14.0"; private GenericContainer mockserverContainer; @@ -162,6 +166,18 @@ public void testSourceToAssertSink(TestContainer container) Assertions.assertEquals(0, execResult18.getExitCode()); } + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + @TestTemplate + public void testMultiTableHttp(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fake_to_multitable.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertTrue(execResult.getStdout().contains(successCount)); + } + public String getMockServerConfig() { return "/mockserver-config.json"; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf new file mode 100644 index 00000000000..7ed2ea8a593 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "http_sink_1" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + }, + { + schema = { + table = "http_sink_2" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } + ] + } +} + + + +sink { + Http { + url = "http://mockserver:1080/example/httpMultiTableContentSink" + headers { + token = "9e32e859ef044462a257e1fc76730066" + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json index 4ce23c4acb0..42d000f713f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json @@ -4720,5 +4720,29 @@ "Content-Type": "application/json" } } + }, + { + "httpRequest": { + "path": "/example/httpMultiTableContentSink", + "method": "POST", + "headers": { + "token": ["9e32e859ef044462a257e1fc76730066"] + } + }, + "httpResponse": { + "body": [ + { + "name": "httpMultiTableContentSink", + "age": 18 + }, + { + "name": "pizz2", + "age": 19 + } + ], + "headers": { + "Content-Type": "application/json" + } + } } ]