Skip to content

Commit

Permalink
[Feature][Zeta] Support delete logs regularly (#7787)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w authored Nov 5, 2024
1 parent 019af39 commit 5089d8a
Show file tree
Hide file tree
Showing 23 changed files with 513 additions and 81 deletions.
2 changes: 2 additions & 0 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ seatunnel:
telemetry:
metric:
enabled: false
log:
scheduled-deletion-enable: true
http:
enable-http: true
port: 8080
Expand Down
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ SeaTunnel provides an API for querying logs.

For more details, please refer to the [REST-API](rest-api-v2.md).

## SeaTunnel Log Configuration

### Scheduled deletion of old logs

SeaTunnel supports scheduled deletion of old log files to prevent disk space exhaustion. You can add the following configuration in the `seatunnel.yml` file:

```yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
telemetry:
logs:
scheduled-deletion-enable: true
```
- `history-job-expire-minutes`: Sets the retention time for historical job data and logs (in minutes). The system will automatically clear expired job information and log files after the specified period.
- `scheduled-deletion-enable`: Enable scheduled cleanup, with default value of `true`. The system will automatically delete relevant log files when job expiration time, as defined by `history-job-expire-minutes`, is reached. If this feature is disabled, logs will remain permanently on disk, requiring manual management, which may affect disk space usage. It is recommended to configure this setting based on specific needs.

## Best practices for developers

You can create an SLF4J logger by calling `org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class as an argument.
Expand Down
19 changes: 19 additions & 0 deletions docs/zh/seatunnel-engine/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ SeaTunnel 提供了一个 API,用于查询日志。

有关详细信息,请参阅 [REST-API](rest-api-v2.md)

## SeaTunnel 日志配置

### 定时删除旧日志

SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足。您可以在 `seatunnel.yml` 文件中添加以下配置:

```yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
telemetry:
logs:
scheduled-deletion-enable: true
```
- `history-job-expire-minutes`: 设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。
- `scheduled-deletion-enable`: 启用定时清理功能,默认为 `true`。系统将在作业达到 `history-job-expire-minutes` 设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。


## 开发人员最佳实践

您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J 记录器。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
Expand All @@ -35,12 +36,15 @@
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import com.beust.jcommander.internal.Lists;
import com.hazelcast.jet.datamodel.Tuple2;
import io.restassured.response.Response;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -52,7 +56,11 @@
public class JobLogIT extends SeaTunnelContainer {

private static final String CUSTOM_JOB_NAME = "test-job-log-file";
private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2";
private static final String CUSTOM_JOB_NAME3 = "test-job-log-file3";
private static final long CUSTOM_JOB_ID = 862969647010611201L;
private static final long CUSTOM_JOB_ID2 = 862969647010611202L;
private static final long CUSTOM_JOB_ID3 = 862969647010611203L;

private static final String confFile = "/fakesource_to_console.conf";
private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL);
Expand Down Expand Up @@ -99,10 +107,29 @@ public void tearDown() throws Exception {
@Test
public void testJobLogFile() throws Exception {
submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID);
server, JobMode.BATCH.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID);

submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME2, CUSTOM_JOB_ID2);

submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME3, CUSTOM_JOB_ID3);

assertConsoleLog();
assertFileLog();
List<Tuple2<Boolean, String>> before =
Lists.newArrayList(
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log"));
assertFileLogClean(before);
Thread.sleep(90000);
List<Tuple2<Boolean, String>> after =
Lists.newArrayList(
Tuple2.tuple2(true, "job-" + CUSTOM_JOB_ID + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log"));
assertFileLogClean(after);
}

private void assertConsoleLog() {
Expand Down Expand Up @@ -168,6 +195,22 @@ private void assertFileLog() throws IOException, InterruptedException {
});
}

private void assertFileLogClean(List<Tuple2<Boolean, String>> tuple2s)
throws IOException, InterruptedException {
for (Tuple2<Boolean, String> tuple2 : tuple2s) {
Container.ExecResult execResult =
server.execInContainer(
"sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n");
String file = execResult.getStdout();
execResult =
secondServer.execInContainer(
"sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n");
String file1 = execResult.getStdout();
Assertions.assertEquals(
tuple2.f0(), StringUtils.isBlank(file) && StringUtils.isBlank(file1));
}
}

private Response submitJob(
GenericContainer<?> container,
String jobMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

seatunnel:
engine:
history-job-expire-minutes: 1440
history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
Expand All @@ -35,3 +35,8 @@ seatunnel:
enable-http: true
port: 8080
enable-dynamic-port: false
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: true
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ seatunnel:
namespace: /tmp/seatunnel/checkpoint_snapshot/
http:
enable-http: false
port: 8080
port: 8080
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: true
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;

Expand Down Expand Up @@ -330,17 +331,19 @@ private Map<String, String> parseConnectorJarHAStoragePluginConfig(
}

private TelemetryConfig parseTelemetryConfig(Node telemetryNode) {
TelemetryConfig metricConfig = new TelemetryConfig();
TelemetryConfig telemetryConfig = new TelemetryConfig();
for (Node node : childElements(telemetryNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.TELEMETRY_METRIC.key().equals(name)) {
metricConfig.setMetric(parseTelemetryMetricConfig(node));
telemetryConfig.setMetric(parseTelemetryMetricConfig(node));
} else if (ServerConfigOptions.TELEMETRY_LOGS.key().equals(name)) {
telemetryConfig.setLogs(parseTelemetryLogsConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}

return metricConfig;
return telemetryConfig;
}

private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
Expand All @@ -357,6 +360,20 @@ private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
return metricConfig;
}

private TelemetryLogsConfig parseTelemetryLogsConfig(Node logsNode) {
TelemetryLogsConfig logsConfig = new TelemetryLogsConfig();
for (Node node : childElements(logsNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.key().equals(name)) {
logsConfig.setEnabled(getBooleanValue(getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}

return logsConfig;
}

private HttpConfig parseHttpConfig(Node httpNode) {
HttpConfig httpConfig = new HttpConfig();
for (Node node : childElements(httpNode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ public class ServerConfigOptions {
.withDescription(
"Whether to use classloader cache mode. With cache mode, all jobs share the same classloader if the jars are the same");

public static final Option<Boolean> TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE =
Options.key("scheduled-deletion-enable")
.booleanType()
.defaultValue(true)
.withDescription(
"Enable scheduled cleanup, with default value of true. The system will automatically delete relevant log files when job expiration time, as defined by `history-job-expire-minutes`, is reached. "
+ "If this feature is disabled, logs will remain permanently on disk, requiring manual management, which may affect disk space usage. It is recommended to configure this setting based on specific needs.");

public static final Option<TelemetryLogsConfig> TELEMETRY_LOGS =
Options.key("logs")
.type(new TypeReference<TelemetryLogsConfig>() {})
.defaultValue(new TelemetryLogsConfig())
.withDescription("The telemetry logs configuration.");

public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
Options.key("enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
public class TelemetryConfig implements Serializable {

private TelemetryMetricConfig metric = ServerConfigOptions.TELEMETRY_METRIC.defaultValue();

private TelemetryLogsConfig logs = ServerConfigOptions.TELEMETRY_LOGS.defaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.config.server;

import lombok.Data;

import java.io.Serializable;

@Data
public class TelemetryLogsConfig implements Serializable {

private boolean enabled =
ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.defaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.builder.api.Component;
import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
import org.apache.logging.log4j.core.lookup.StrSubstitutor;

import java.lang.reflect.Field;

public class LogUtil {

/** Get configuration log path by log4j */
public static String getLogPath() throws NoSuchFieldException, IllegalAccessException {
String routingAppender = "routingAppender";
String fileAppender = "fileAppender";
PropertiesConfiguration config = getLogConfiguration();
// Get routingAppender log file path
String routingLogFilePath = getRoutingLogFilePath(config);

// Get fileAppender log file path
String fileLogPath = getFileLogPath(config);
String logRef =
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
.map(Object::toString)
.filter(ref -> ref.contains(routingAppender) || ref.contains(fileAppender))
.findFirst()
.orElse(StringUtils.EMPTY);
if (logRef.equals(routingAppender)) {
return routingLogFilePath.substring(0, routingLogFilePath.lastIndexOf("/"));
} else if (logRef.equals(fileAppender)) {
return fileLogPath.substring(0, routingLogFilePath.lastIndexOf("/"));
} else {
throw new IllegalArgumentException(
String.format("Log file path is empty, get logRef : %s", logRef));
}
}

private static PropertiesConfiguration getLogConfiguration() {
LoggerContext context = (LoggerContext) LogManager.getContext(false);
return (PropertiesConfiguration) context.getConfiguration();
}

private static String getRoutingLogFilePath(PropertiesConfiguration config)
throws NoSuchFieldException, IllegalAccessException {
Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent");
propertiesField.setAccessible(true);
Component propertiesComponent = (Component) propertiesField.get(config);
StrSubstitutor substitutor = config.getStrSubstitutor();
return propertiesComponent.getComponents().stream()
.filter(
component ->
"routingAppender".equals(component.getAttributes().get("name")))
.flatMap(component -> component.getComponents().stream())
.flatMap(component -> component.getComponents().stream())
.flatMap(component -> component.getComponents().stream())
.map(component -> substitutor.replace(component.getAttributes().get("fileName")))
.findFirst()
.orElse(null);
}

private static String getFileLogPath(PropertiesConfiguration config)
throws NoSuchFieldException, IllegalAccessException {
Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent");
propertiesField.setAccessible(true);
Component propertiesComponent = (Component) propertiesField.get(config);
StrSubstitutor substitutor = config.getStrSubstitutor();
return propertiesComponent.getComponents().stream()
.filter(component -> "fileAppender".equals(component.getAttributes().get("name")))
.map(component -> substitutor.replace(component.getAttributes().get("fileName")))
.findFirst()
.orElse(null);
}
}
Loading

0 comments on commit 5089d8a

Please sign in to comment.