From 2cc82bd4dc491c23e6c60a55a58c64fbaa9a1b95 Mon Sep 17 00:00:00 2001 From: Guangdong Liu <804167098@qq.com> Date: Mon, 3 Jun 2024 18:40:57 +0800 Subject: [PATCH] [Feature][RestAPI] Add overview api (#6883) --- docs/en/seatunnel-engine/rest-api.md | 20 ++++ docs/zh/seatunnel-engine/rest-api.md | 20 ++++ pom.xml | 7 ++ .../seatunnel/engine/e2e/RestApiIT.java | 20 ++++ .../src/test/resources/seatunnel.yaml | 3 +- .../seatunnel-engine-common/pom.xml | 48 ++++++++ .../seatunnel/engine/common/Constant.java | 2 + .../engine/common/env/EnvironmentUtil.java | 89 ++++++++++++++ .../seatunnel/engine/common/env/Version.java | 29 +++++ .../zeta.version.properties | 22 ++++ .../common/config/EnvironmentUtilTest.java | 40 +++++++ .../AbstractResourceManager.java | 12 ++ .../resourcemanager/ResourceManager.java | 4 + .../opeartion/GetOverviewOperation.java | 110 ++++++++++++++++++ .../resource/OverviewInfo.java | 35 ++++++ .../engine/server/rest/RestConstant.java | 2 + .../rest/RestHttpGetCommandProcessor.java | 42 ++++++- .../ResourceDataSerializerHook.java | 5 + 18 files changed, 505 insertions(+), 5 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java create mode 100644 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java create mode 100644 seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties create mode 100644 seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 0a0c1605876..adde89d89a1 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -35,6 +35,26 @@ network: ## API reference +### Returns an overview over the Zeta engine cluster. + +#### Parameters + +#### Responses + +```json +{ + "projectVersion":"2.3.5-SNAPSHOT", + "gitCommitAbbrev":"DeadD0d0", + "totalSlot":"0", + "unassignedSlot":"0", + "runningJobs":"0", + "finishedJobs":"0", + "failedJobs":"0", + "cancelledJobs":"0", + "works":"1" +} +``` + ### Returns an overview over all jobs and their current state.
diff --git a/docs/zh/seatunnel-engine/rest-api.md b/docs/zh/seatunnel-engine/rest-api.md index ee9a1511a95..e6aecaeb2fa 100644 --- a/docs/zh/seatunnel-engine/rest-api.md +++ b/docs/zh/seatunnel-engine/rest-api.md @@ -34,6 +34,26 @@ network: ## API参考 +### 返回Zeta集群的概览 + +#### 参数 + +#### 响应 + +```json +{ + "projectVersion":"2.3.5-SNAPSHOT", + "gitCommitAbbrev":"DeadD0d0", + "totalSlot":"0", + "unassignedSlot":"0", + "runningJobs":"0", + "finishedJobs":"0", + "failedJobs":"0", + "cancelledJobs":"0", + "works":"1" +} +``` + ### 返回所有作业及其当前状态的概览。
diff --git a/pom.xml b/pom.xml index acff06bc091..50c0d412008 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ 1.3.3 3.3.0 3.2.0 + 4.0.4 1.3.0 1.20 2.17.1 @@ -704,6 +705,12 @@ ${maven-helper-plugin.version} + + pl.project13.maven + git-commit-id-plugin + ${maven-git-commit-id-plugin.version} + + org.codehaus.mojo license-maven-plugin diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 3569fb4b11f..37a8d51b761 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -218,6 +218,26 @@ public void testGetJobInfoByJobId() { }); } + @Test + public void testOverview() { + Arrays.asList(node2, node1) + .forEach( + instance -> { + given().get( + HOST + + instance.getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.OVERVIEW) + .then() + .statusCode(200) + .body("projectVersion", notNullValue()) + .body("totalSlot", equalTo("40")) + .body("workers", equalTo("2")); + }); + } + @Test public void testGetRunningThreads() { Arrays.asList(node2, node1) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 7775a483cd7..b643fe16b34 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -22,7 +22,8 @@ seatunnel: queue-type: blockingqueue print-execution-info-interval: 10 slot-service: - dynamic-slot: true + dynamic-slot: false + slot-num: 20 checkpoint: interval: 300000 timeout: 100000 diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml index 824a4aed53e..0de0614dee2 100644 --- a/seatunnel-engine/seatunnel-engine-common/pom.xml +++ b/seatunnel-engine/seatunnel-engine-common/pom.xml @@ -42,4 +42,52 @@ ${project.version} + + + + + false + src/main/resources + + + true + src/main/resources-filtered + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + parse-version + + parse-version + + + + + + pl.project13.maven + git-commit-id-plugin + + false + false + false + + true + + + + + get-the-git-information + + revision + + validate + + + + + diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index 3dc739168b3..fdb21025814 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -60,4 +60,6 @@ public class Constant { public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L; public static final String IMAP_CONNECTOR_JAR_REF_COUNTERS = "engine_connectorJarRefCounters"; + + public static final String PROP_FILE = "zeta.version.properties"; } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java new file mode 100644 index 00000000000..262b741cd8b --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/EnvironmentUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.env; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.InputStream; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Properties; + +import static org.apache.seatunnel.engine.common.Constant.PROP_FILE; + +@Slf4j +public class EnvironmentUtil { + + private static String getProperty(Properties properties, String key, String defaultValue) { + String value = properties.getProperty(key); + if (value == null || value.charAt(0) == '$') { + return defaultValue; + } + return value; + } + + public static Version getVersion() { + + Version version = new Version(); + ClassLoader classLoader = EnvironmentUtil.class.getClassLoader(); + + try (InputStream propFile = classLoader.getResourceAsStream(PROP_FILE)) { + + if (propFile != null) { + Properties properties = new Properties(); + + properties.load(propFile); + + version.setProjectVersion( + getProperty(properties, "project.version", version.getProjectVersion())); + version.setGitCommitId( + getProperty(properties, "git.commit.id", version.getGitCommitId())); + version.setGitCommitAbbrev( + getProperty( + properties, "git.commit.id.abbrev", version.getGitCommitAbbrev())); + + DateTimeFormatter gitDateTimeFormatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ"); + + DateTimeFormatter systemDefault = + DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.systemDefault()); + + version.setBuildTime( + systemDefault.format( + gitDateTimeFormatter.parse( + getProperty( + properties, + "git.build.time", + version.getBuildTime())))); + version.setCommitTime( + systemDefault.format( + gitDateTimeFormatter.parse( + getProperty( + properties, + "git.commit.time", + version.getCommitTime())))); + } + + } catch (IOException ioException) { + log.info("Unable to read version property file: {}", ioException.getMessage()); + } + + return version; + } +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java new file mode 100644 index 00000000000..b04dc96f4b6 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/env/Version.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.env; + +import lombok.Data; + +@Data +public class Version { + private String projectVersion = ""; + private String gitCommitId = "DecafC0ffeeD0d0F00d"; + private String buildTime = "1970-01-01T00:00:00+0000"; + private String commitTime = "1970-01-01T00:00:00+0000"; + private String gitCommitAbbrev = "DeadD0d0"; +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties b/seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties new file mode 100644 index 00000000000..72aa40b7949 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources-filtered/zeta.version.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project.version=${project.version} +git.commit.id=${git.commit.id} +git.commit.id.abbrev=${git.commit.id.abbrev} +git.commit.time=${git.commit.time} +git.build.time=${git.build.time} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java new file mode 100644 index 00000000000..0e2c57c2f17 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/EnvironmentUtilTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.config; + +import org.apache.seatunnel.engine.common.env.EnvironmentUtil; +import org.apache.seatunnel.engine.common.env.Version; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class EnvironmentUtilTest { + + @Test + public void testGetVersion() { + + Version version = EnvironmentUtil.getVersion(); + + assertNotNull(version.getProjectVersion()); + assertNotNull(version.getGitCommitId()); + assertNotNull(version.getGitCommitAbbrev()); + assertNotNull(version.getBuildTime()); + assertNotNull(version.getCommitTime()); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index 2caa6e68166..9d9b9d9a76d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -223,4 +223,16 @@ public List getUnassignedSlots() { .flatMap(workerProfile -> Arrays.stream(workerProfile.getUnassignedSlots())) .collect(Collectors.toList()); } + + @Override + public List getAssignedSlots() { + return registerWorker.values().stream() + .flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots())) + .collect(Collectors.toList()); + } + + @Override + public int workerCount() { + return registerWorker.size(); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java index ca668482aac..8a04b21e4ba 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java @@ -60,4 +60,8 @@ CompletableFuture> applyResources( void close(); List getUnassignedSlots(); + + List getAssignedSlots(); + + int workerCount(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java new file mode 100644 index 00000000000..6bc0ef89061 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.opeartion; + +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook; + +import com.hazelcast.map.IMap; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.NodeEngine; +import com.hazelcast.spi.impl.operationservice.Operation; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class GetOverviewOperation extends Operation implements IdentifiedDataSerializable { + + private OverviewInfo overviewInfo; + + @Override + public void run() throws Exception { + SeaTunnelServer server = getService(); + + overviewInfo = getOverviewInfo(server, getNodeEngine()); + } + + @Override + public Object getResponse() { + return overviewInfo; + } + + @Override + public int getFactoryId() { + return ResourceDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return ResourceDataSerializerHook.REQUEST_SLOT_INFO_TYPE; + } + + @Override + public String getServiceName() { + return SeaTunnelServer.SERVICE_NAME; + } + + public static OverviewInfo getOverviewInfo(SeaTunnelServer server, NodeEngine nodeEngine) { + OverviewInfo overviewInfo = new OverviewInfo(); + ResourceManager resourceManager = server.getCoordinatorService().getResourceManager(); + + List assignedSlots = resourceManager.getAssignedSlots(); + + List unassignedSlots = resourceManager.getUnassignedSlots(); + IMap finishedJob = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE); + overviewInfo.setTotalSlot(assignedSlots.size() + unassignedSlots.size()); + overviewInfo.setUnassignedSlot(unassignedSlots.size()); + overviewInfo.setRunningJobs( + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO).size()); + overviewInfo.setFailedJobs( + finishedJob.values().stream() + .filter( + jobState -> + jobState.getJobStatus() + .name() + .equals(JobStatus.FAILED.toString())) + .count()); + overviewInfo.setCancelledJobs( + finishedJob.values().stream() + .filter( + jobState -> + jobState.getJobStatus() + .name() + .equals(JobStatus.CANCELED.toString())) + .count()); + overviewInfo.setWorkers(resourceManager.workerCount()); + overviewInfo.setFinishedJobs( + finishedJob.values().stream() + .filter( + jobState -> + jobState.getJobStatus() + .name() + .equals(JobStatus.FINISHED.toString())) + .count()); + + return overviewInfo; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java new file mode 100644 index 00000000000..50479443231 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/OverviewInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.resource; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class OverviewInfo implements Serializable { + private String projectVersion; + private String gitCommitAbbrev; + private int totalSlot; + private int unassignedSlot; + private long runningJobs; + private long finishedJobs; + private long failedJobs; + private long cancelledJobs; + private int workers; +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 6daa817a48c..72487737033 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -44,6 +44,8 @@ public class RestConstant { public static final String ERROR_MSG = "errorMsg"; public static final String METRICS = "metrics"; + + public static final String OVERVIEW = "/hazelcast/rest/maps/overview"; public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs"; @Deprecated public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; public static final String JOB_INFO_URL = "/hazelcast/rest/maps/job-info"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index b4110f46fff..0e89f9cfda8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -25,6 +25,8 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.env.EnvironmentUtil; +import org.apache.seatunnel.engine.common.env.Version; import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -37,6 +39,8 @@ import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation; import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation; import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation; +import org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation; +import org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import com.hazelcast.cluster.Address; @@ -53,6 +57,7 @@ import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; +import com.hazelcast.spi.impl.NodeEngineImpl; import java.util.Arrays; import java.util.Comparator; @@ -64,6 +69,7 @@ import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO; import static org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL; +import static org.apache.seatunnel.engine.server.rest.RestConstant.OVERVIEW; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_THREADS; @@ -71,12 +77,9 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor { - private final Log4j2HttpGetCommandProcessor original; - private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; - private static final String SINK_WRITE_COUNT = "SinkWriteCount"; - + private final Log4j2HttpGetCommandProcessor original; private NodeEngine nodeEngine; public RestHttpGetCommandProcessor(TextCommandService textCommandService) { @@ -106,6 +109,8 @@ public void handle(HttpGetCommand httpGetCommand) { getSystemMonitoringInformation(httpGetCommand); } else if (uri.startsWith(RUNNING_THREADS)) { getRunningThread(httpGetCommand); + } else if (uri.startsWith(OVERVIEW)) { + overView(httpGetCommand); } else { original.handle(httpGetCommand); } @@ -124,6 +129,35 @@ public void handleRejection(HttpGetCommand httpGetCommand) { handle(httpGetCommand); } + public void overView(HttpGetCommand command) { + + Version version = EnvironmentUtil.getVersion(); + + SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true); + + OverviewInfo overviewInfo; + + if (seaTunnelServer == null) { + overviewInfo = + (OverviewInfo) + NodeEngineUtil.sendOperationToMasterNode( + getNode().nodeEngine, new GetOverviewOperation()) + .join(); + overviewInfo.setProjectVersion(version.getProjectVersion()); + overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev()); + } else { + + NodeEngineImpl nodeEngine = this.textCommandService.getNode().getNodeEngine(); + overviewInfo = GetOverviewOperation.getOverviewInfo(seaTunnelServer, nodeEngine); + overviewInfo.setProjectVersion(version.getProjectVersion()); + overviewInfo.setGitCommitAbbrev(version.getGitCommitAbbrev()); + } + + this.prepareResponse( + command, + JsonUtil.toJsonObject(JsonUtils.toMap(JsonUtils.toJsonString(overviewInfo)))); + } + private void getSystemMonitoringInformation(HttpGetCommand command) { Cluster cluster = textCommandService.getNode().hazelcastInstance.getCluster(); nodeEngine = textCommandService.getNode().hazelcastInstance.node.nodeEngine; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java index d229e4cd90b..09f9a4550c0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.engine.server.serializable; import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant; +import org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation; import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation; import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation; import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation; @@ -50,6 +51,8 @@ public class ResourceDataSerializerHook implements DataSerializerHook { public static final int SYNC_SLOT_SERVICE_STATUS_TYPE = 8; + public static final int REQUEST_SLOT_INFO_TYPE = 9; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY, @@ -86,6 +89,8 @@ public IdentifiedDataSerializable create(int typeId) { return new SlotAndWorkerProfile(); case SYNC_SLOT_SERVICE_STATUS_TYPE: return new SyncWorkerProfileOperation(); + case REQUEST_SLOT_INFO_TYPE: + return new GetOverviewOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); }