Skip to content

Commit

Permalink
[Core][Metrics] Add Seatunnel Metrics module (#2888)
Browse files Browse the repository at this point in the history
* seatunnel-metrics init commit

* seatunnel-metrics config add

* codeStyle update

* codeStyle update again

* codeStyle seatunnel-spark update

* [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)

* parameter verification

* update

* update

* [Improve][DOC] Perfect the connector v2 doc (#2800)

* [Improve][DOC] Perfect the connector v2 doc

* Update seatunnel-connectors-v2/README.zh.md

Co-authored-by: Hisoka <[email protected]>

* [Improve][DOC] A little tinkering

* [Improve][DOC] A little tinkering

* [Doc][connector] add Console sink doc

close #2794

* [Doc][connector] add Console sink doc

close #2794

* fix some problem

* fix some problem

* fine tuning

Co-authored-by: Hisoka <[email protected]>

* add seatunnel-examples from gitignore (#2892)

* [Improve][connector-jdbc] Calculate splits only once in JdbcSourceSplitEnumerator (#2900)

* [Bug][Connector-V2] Fix wechat sink data serialization (#2856)

* [Improve][Connector-V2] Improve orc write strategy to support all data types (#2860)

* [Improve][Connector-V2] Improve orc write strategy to support all data types

Co-authored-by: tyrantlucifer <[email protected]>

* [Bug][seatunnel-translation-base] Fix Source restore state NPE (#2878)

* [Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)

* [Improve][Connector-v2]Supports direct definition of data values(row)

* seatunnel-prometheus update

* seatunnel-prometheus update

* seatunnel-prometheus update

* 1. Seatunnel unified configuration naming
2. Use reflection to automate assembly
3. Modify the flink/spark startup function
4. Try packaging configuration (todo)

Co-authored-by: TaoZex <[email protected]>
Co-authored-by: liugddx <[email protected]>
Co-authored-by: Hisoka <[email protected]>
Co-authored-by: Eric <[email protected]>
Co-authored-by: Xiao Zhao <[email protected]>
Co-authored-by: hailin0 <[email protected]>
Co-authored-by: tyrantlucifer <[email protected]>
Co-authored-by: Laglangyue <[email protected]>
  • Loading branch information
9 people authored Oct 27, 2022
1 parent 1ece805 commit 0567e2a
Show file tree
Hide file tree
Showing 62 changed files with 2,308 additions and 227 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ Test.scala
test.conf
spark-warehouse
*.flattened-pom.xml

seatunnel-examples
8 changes: 8 additions & 0 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@ just for testing, such as type conversion and feature testing
|-------------------|--------|----------|---------------|
| result_table_name | string | yes | - |
| schema | config | yes | - |
| row.num | long | no | 10 |

### result_table_name [string]

The table name.

### type [string]

Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.
**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.
**Example**:

### row.num
Number of additional rows of generated data

```hocon
schema = {
fields {
Expand All @@ -55,7 +61,9 @@ schema = {
```

## Example

Simple source for FakeSource which contains enough datatype

```hocon
source {
FakeSource {
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<module>seatunnel-formats</module>
<module>seatunnel-dist</module>
<module>seatunnel-server</module>
<module>seatunnel-metrics</module>
</modules>

<profiles>
Expand Down
7 changes: 6 additions & 1 deletion seatunnel-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,10 @@
<artifactId>jackson-dataformat-properties</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-metrics-core</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.seatunnel.flink;

import static org.apache.flink.configuration.ConfigOptions.key;

import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.flink.util.ConfigKeyName;
Expand All @@ -29,6 +32,7 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
Expand All @@ -47,6 +51,7 @@
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -126,8 +131,8 @@ public void registerPlugin(List<URL> pluginPaths) {
List<Configuration> configurations = new ArrayList<>();
try {
configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
"getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
"method: getConfiguration")).invoke(this.environment));
"getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
"method: getConfiguration")).invoke(this.environment));
if (!isStreaming()) {
configurations.add(batchEnvironment.getConfiguration());
}
Expand Down Expand Up @@ -161,9 +166,9 @@ public StreamTableEnvironment getStreamTableEnvironment() {
private void createStreamTableEnvironment() {
// use blink and streammode
EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance()
.inStreamingMode();
.inStreamingMode();
if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink"
.equals(this.config.getString(ConfigKeyName.PLANNER))) {
.equals(this.config.getString(ConfigKeyName.PLANNER))) {
envBuilder.useBlinkPlanner();
} else {
envBuilder.useOldPlanner();
Expand All @@ -173,15 +178,15 @@ private void createStreamTableEnvironment() {
tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config
.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
}
}

private void createStreamEnvironment() {
environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment = creatMetricEnvironment();
setTimeCharacteristic();

setCheckpoint();
Expand Down Expand Up @@ -244,8 +249,8 @@ private void setTimeCharacteristic() {
break;
default:
LOGGER.warn(
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
break;
}
}
Expand All @@ -268,8 +273,8 @@ private void setCheckpoint() {
break;
default:
LOGGER.warn(
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
break;
}
}
Expand Down Expand Up @@ -302,10 +307,10 @@ private void setCheckpoint() {
boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
if (cleanup) {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
} else {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}
}
Expand All @@ -322,4 +327,71 @@ private void setCheckpoint() {
}
}

public StreamExecutionEnvironment creatMetricEnvironment() {

if (!config.hasPath(CollectionConstants.METRICS_CLASS)) {
return StreamExecutionEnvironment.getExecutionEnvironment();
}

Configuration seatunnelReporter = initMetricConfig();

return StreamExecutionEnvironment.getExecutionEnvironment(seatunnelReporter);

}

private Configuration initMetricConfig() {
final int defaultDuration = 10;
//Build flink-metrics parameters
ConfigOption<String> reportersList =
key("metrics.reporters")
.stringType()
.noDefaultValue();

ConfigOption<String> reporterClass =
key("metrics.reporter.seatunnel_reporter.class")
.stringType()
.noDefaultValue();
ConfigOption<Duration> reporterInterval =
key("metrics.reporter.seatunnel_reporter.interval")
.durationType()
.defaultValue(Duration.ofSeconds(defaultDuration));

ConfigOption<String> reporterConfigPort =
key("metrics.reporter.seatunnel_reporter.port")
.stringType()
.noDefaultValue();
ConfigOption<String> reporterConfigHost =
key("metrics.reporter.seatunnel_reporter.host")
.stringType()
.noDefaultValue();
ConfigOption<String> reporterConfigJobName =
key("metrics.reporter.seatunnel_reporter.jobName")
.stringType()
.noDefaultValue();
ConfigOption<String> reporterConfigReporterName =
key("metrics.reporter.seatunnel_reporter.reporterName")
.stringType()
.noDefaultValue();

Configuration seatunnelReporter = new Configuration().set(reportersList, "seatunnel_reporter").set(reporterClass, "org.apache.seatunnel.metrics.flink.SeatunnelMetricReporter");
if (config.hasPath(CollectionConstants.METRICS_INTERVAL)) {
Duration duration = Duration.ofSeconds(config.getLong(CollectionConstants.METRICS_INTERVAL));
seatunnelReporter.set(reporterInterval, duration);
}

if (config.hasPath(CollectionConstants.METRICS_PORT)) {
seatunnelReporter.set(reporterConfigPort, config.getString(CollectionConstants.METRICS_PORT));
}

if (config.hasPath(CollectionConstants.METRICS_HOST)) {
seatunnelReporter.set(reporterConfigHost, config.getString(CollectionConstants.METRICS_HOST));
}

if (config.hasPath(CollectionConstants.METRICS_JOB_NAME)) {
seatunnelReporter.set(reporterConfigJobName, config.getString(CollectionConstants.METRICS_JOB_NAME));
}
seatunnelReporter.set(reporterConfigReporterName, config.getString(CollectionConstants.METRICS_CLASS));
return seatunnelReporter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ private ConfigKeyName() {
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
public static final String STATE_BACKEND = "execution.state.backend";
public static final String PLANNER = "execution.planner";
public static final String METRICS_INTERVAL = "execution.metrics.interval";
public static final String METRICS_CLASS = "execution.metrics.class";
public static final String METRICS_PORT = "execution.metrics.port";
public static final String METRICS_JOB_NAME = "execution.metrics.jobName";
public static final String METRICS_HOST = "execution.metrics.host";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void registerPlugin(List<URL> pluginPaths) {
public SparkEnvironment prepare() {
SparkConf sparkConf = createSparkConf();
SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
creatMetricBuilder(builder);
if (enableHive) {
builder.enableHiveSupport();
}
Expand Down Expand Up @@ -178,6 +180,26 @@ public static <T extends Object> T sinkProcess(SparkEnvironment environment, Bas
}
return sink.output(fromDs, environment);
}

private SparkSession.Builder creatMetricBuilder(SparkSession.Builder builder){
if (config.hasPath(CollectionConstants.METRICS_CLASS)) {
builder.config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.seatunnel.metrics.sink.SeatunnelMetricSink");
if (config.hasPath(CollectionConstants.METRICS_HOST)) {
builder.config("spark.metrics.conf.*.sink.console.host", config.getString(CollectionConstants.METRICS_HOST));
}
if (config.hasPath(CollectionConstants.METRICS_PORT)) {
builder.config("spark.metrics.conf.*.sink.console.port", config.getString(CollectionConstants.METRICS_PORT));
}
if (config.hasPath(CollectionConstants.METRICS_JOB_NAME)) {
builder.config("spark.metrics.conf.*.sink.console.jobName", config.getString(CollectionConstants.METRICS_JOB_NAME));
}
if (config.hasPath(CollectionConstants.METRICS_INTERVAL)) {
builder.config("spark.metrics.conf.*.sink.console.interval", config.getString(CollectionConstants.METRICS_INTERVAL));
}
builder.config("spark.metrics.conf.*.sink.console.reporterName", config.getString(CollectionConstants.METRICS_CLASS));
}
return builder;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@
public class CollectionConstants {

public static final int MAP_SIZE = 6;
public static final String METRICS_INTERVAL = "seatunnel.metrics.interval";
public static final String METRICS_CLASS = "seatunnel.metrics.class";
public static final String METRICS_PORT = "seatunnel.metrics.port";
public static final String METRICS_JOB_NAME = "seatunnel.metrics.jobName";
public static final String METRICS_HOST = "seatunnel.metrics.host";
}
Loading

0 comments on commit 0567e2a

Please sign in to comment.