Skip to content

Commit

Permalink
[Improve][Connector-V2]Support multi-table sink feature for httpsink (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyyyyyssss authored Feb 18, 2024
1 parent eee881a commit e6c51a9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -36,7 +37,8 @@
import java.util.Map;
import java.util.stream.Collectors;

public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
protected final HttpParameter httpParameter = new HttpParameter();
protected SeaTunnelRowType seaTunnelRowType;
protected Config pluginConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
Expand All @@ -32,7 +33,8 @@
import java.util.Objects;

@Slf4j
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
protected final HttpClientProvider httpClient;
protected final SeaTunnelRowType seaTunnelRowType;
protected final HttpParameter httpParameter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -44,6 +46,8 @@ public class HttpIT extends TestSuiteBase implements TestResource {

private static final String TMP_DIR = "/tmp";

private static final String successCount = "Total Write Count : 2";

private static final String IMAGE = "mockserver/mockserver:5.14.0";

private GenericContainer<?> mockserverContainer;
Expand Down Expand Up @@ -162,6 +166,18 @@ public void testSourceToAssertSink(TestContainer container)
Assertions.assertEquals(0, execResult18.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK/FLINK do not support multiple table read")
@TestTemplate
public void testMultiTableHttp(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/fake_to_multitable.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(execResult.getStdout().contains(successCount));
}

public String getMockServerConfig() {
return "/mockserver-config.json";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}


source {
FakeSource {
tables_configs = [
{
schema = {
table = "http_sink_1"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
},
{
schema = {
table = "http_sink_2"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
]
}
}



sink {
Http {
url = "http://mockserver:1080/example/httpMultiTableContentSink"
headers {
token = "9e32e859ef044462a257e1fc76730066"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4720,5 +4720,29 @@
"Content-Type": "application/json"
}
}
},
{
"httpRequest": {
"path": "/example/httpMultiTableContentSink",
"method": "POST",
"headers": {
"token": ["9e32e859ef044462a257e1fc76730066"]
}
},
"httpResponse": {
"body": [
{
"name": "httpMultiTableContentSink",
"age": 18
},
{
"name": "pizz2",
"age": 19
}
],
"headers": {
"Content-Type": "application/json"
}
}
}
]

0 comments on commit e6c51a9

Please sign in to comment.