Skip to content

Commit

Permalink
[Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897)
Browse files Browse the repository at this point in the history
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally
[(#7897)](#7897)

2、Added E2E test cases for this issue [(#7897)](#7897)

3、Local developers want to observe **Job Progress Information** in a timely manner,  Need to modify the following configuration.The configuration in config is invalid
```
seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml
```
  • Loading branch information
YOMO-Lee committed Oct 25, 2024
1 parent 4276681 commit 1b80667
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
Expand All @@ -30,12 +29,14 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

@Slf4j
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {

Expand Down Expand Up @@ -120,7 +121,9 @@ public void addSplits(List<ClickhouseSourceSplit> splits) {
}

@Override
public void handleNoMoreSplits() {noMoreSplit = true;}
public void handleNoMoreSplits() {
noMoreSplit = true;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public void testClickhouse(TestContainer container) throws Exception {
clearSinkTable();
}

@TestTemplate
public void testSourceParallelism(TestContainer container) throws Exception {
System.out.println("=========多并行度测试===========");
Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf");
System.out.println(execResult.getExitCode());
}

@BeforeAll
@Override
public void startUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 2
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from source_table"
username = "default"
password = ""
result_table_name = "source_table"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
}

sink {
console {
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
}

0 comments on commit 1b80667

Please sign in to comment.