Skip to content

Commit

Permalink
[Feature][Zeta] Added the metrics information of table statistics in …
Browse files Browse the repository at this point in the history
…multi-table mode (apache#7212)
  • Loading branch information
hawk9821 authored Jul 26, 2024
1 parent 876d2f0 commit d003bd8
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

Expand Down Expand Up @@ -149,6 +150,10 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters));
}

public List<TablePath> getSinkTables() {
return sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
}

@Override
public Optional<Serializer<MultiTableAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e;

import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;

public class MultiTableMetricsIT {

private static final String HOST = "http://localhost:";

private static ClientJobProxy batchJobProxy;

private static HazelcastInstanceImpl node1;

private static SeaTunnelClient engineClient;

@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);

String batchFilePath = TestUtils.getResource("batch_fake_multi_table_to_console.conf");
JobConfig batchConf = new JobConfig();
batchConf.setName("batch_fake_multi_table_to_console");
ClientJobExecutionEnvironment batchJobExecutionEnv =
engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig);
batchJobProxy = batchJobExecutionEnv.execute();
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, batchJobProxy.getJobStatus()));
}

@Test
public void multiTableMetrics() {
Collections.singletonList(node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.JOB_INFO_URL
+ "/"
+ batchJobProxy.getJobId())
.then()
.statusCode(200)
.body("jobName", equalTo("batch_fake_multi_table_to_console"))
.body("jobStatus", equalTo("FINISHED"))
.body("metrics.SourceReceivedCount", equalTo("50"))
.body("metrics.SinkWriteCount", equalTo("50"))
.body(
"metrics.TableSourceReceivedCount.'fake.table1'",
equalTo("20"))
.body(
"metrics.TableSourceReceivedCount.'fake.public.table2'",
equalTo("30"))
.body(
"metrics.TableSinkWriteCount.'fake.table1'",
equalTo("20"))
.body(
"metrics.TableSinkWriteCount.'fake.public.table2'",
equalTo("30"));
});
}

@AfterEach
void afterClass() {
if (engineClient != null) {
engineClient.close();
}

if (node1 != null) {
node1.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake1"
row.num = 20
schema = {
table = "fake.table1"
fields {
name = "string"
age = "int"
}
}
}

FakeSource {
result_table_name = "fake2"
row.num = 30
schema = {
table = "fake.public.table2"
fields {
name = "string"
age = "int"
sex = "int"
}
}
}
}

transform {
}

sink {
console {
source_table_name = "fake1"
}
console {
source_table_name = "fake2"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.common.config.Common;
Expand Down Expand Up @@ -51,10 +53,14 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
Expand Down Expand Up @@ -548,6 +554,114 @@ public void testSavePointAndRestoreWithSavePoint() throws Exception {
}
}

@Test
public void testGetMultiTableJobMetrics() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/batch_fake_multi_table_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testGetMultiTableJobMetrics");

SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();

try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(
() -> {
return clientJobProxy.waitForJobComplete();
});
long jobId = clientJobProxy.getJobId();

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
jobClient.getJobDetailStatus(jobId).contains("FINISHED")
&& jobClient
.listJobStatus(true)
.contains("FINISHED")));

String jobMetrics = jobClient.getJobMetrics(jobId);

Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1"));
Assertions.assertTrue(
jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2"));

log.info("jobMetrics : {}", jobMetrics);
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
List<String> metricNameList =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
jobMetricsStr.fieldNames(), 0),
false)
.filter(
metricName ->
metricName.startsWith(SOURCE_RECEIVED_COUNT)
|| metricName.startsWith(SINK_WRITE_COUNT))
.collect(Collectors.toList());

Map<String, Long> totalCount =
metricNameList.stream()
.filter(metrics -> !metrics.contains("#"))
.collect(
Collectors.toMap(
metrics -> metrics,
metrics ->
StreamSupport.stream(
jobMetricsStr
.get(metrics)
.spliterator(),
false)
.mapToLong(
value ->
value.get("value")
.asLong())
.sum()));

Map<String, Long> tableCount =
metricNameList.stream()
.filter(metrics -> metrics.contains("#"))
.collect(
Collectors.toMap(
metrics -> metrics,
metrics ->
StreamSupport.stream(
jobMetricsStr
.get(metrics)
.spliterator(),
false)
.mapToLong(
value ->
value.get("value")
.asLong())
.sum()));

Assertions.assertEquals(
totalCount.get(SOURCE_RECEIVED_COUNT),
tableCount.entrySet().stream()
.filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
.mapToLong(Map.Entry::getValue)
.sum());
Assertions.assertEquals(
totalCount.get(SINK_WRITE_COUNT),
tableCount.entrySet().stream()
.filter(e -> e.getKey().startsWith(SINK_WRITE_COUNT))
.mapToLong(Map.Entry::getValue)
.sum());

} catch (ExecutionException | InterruptedException | JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@AfterAll
public static void after() {
INSTANCE.shutdown();
Expand Down
Loading

0 comments on commit d003bd8

Please sign in to comment.