Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Metrics] Add Seatunnel Metrics module #2888

Merged
merged 22 commits into from
Oct 27, 2022

Conversation

lvlv-feifei
Copy link
Contributor

#2735
#2592

Purpose of this pull request

Complete the code of seatuunel-metrics according to the framework of the issue, and add module seatuunel-metrics, including seatuunel-core, seatuunel-flink, seatuunel-spark three modules.
seatuunel-core is responsible for the basic definition of the seatunnel indicator system, including the definition of indicators and external interfaces
seatuunel-flink is responsible for the connection between seatunel and flink metrics, and accepts flink metrics
seatuunel-spark is responsible for the connection between seatuunel and spark metrics, and accepts spark metrics

Check list

@Hisoka-X
Copy link
Member

Please solved ci problems. Thanks

@Hisoka-X Hisoka-X changed the title Seatunnel-metrics [Core][Metrics] Add Seatunnel Metrics module Sep 26, 2022
if(!config.hasPath(ConfigKeyName.Metric_Class)){
return StreamExecutionEnvironment.getExecutionEnvironment();
}
//构建flink-metrics参数
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use english only

@@ -322,4 +326,61 @@ private void setCheckpoint() {
}
}

public StreamExecutionEnvironment creatMetricStreamEEnvironment() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public StreamExecutionEnvironment creatMetricStreamEEnvironment() {
public StreamExecutionEnvironment creatStreamEnvironment() {

Not only have metrics config will run this method.

@@ -45,4 +45,9 @@ private ConfigKeyName() {
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
public static final String STATE_BACKEND = "execution.state.backend";
public static final String PLANNER = "execution.planner";
public static final String Metric_Interval = "execution.metrics.interval";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use unified name in spark/flink. There config name is for SeaTunnel, not only for Flink or Spark.

.stringType()
.noDefaultValue();

Configuration seatunnel_reporter = new Configuration().set(REPORTERS_LIST, "seatunnel_reporter").set(REPORTER_CLASS, "org.apache.seatunnel.metrics.flink.SeatunnelMetricReporter");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lower camel case for field and parameter. Change seatunnel_reporter to seatunnelReporter

return StreamExecutionEnvironment.getExecutionEnvironment();
}
//构建flink-metrics参数
ConfigOption<String> REPORTERS_LIST =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can create an new method called initMetricConfig(Configuration config) to cover this logic.


<dependencies>
<dependency>
<groupId>io.prometheus</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core can't contains prometheus, your should create new module call seatunnel-metrics-prometheus, then implement your metrics push to prometheus logic in there. The core should only have interface for metrics and common code.

/**
* A reporter which outputs measurements to PrometheusPushGateway
*/
public class PrometheusPushGatewayReporter implements MetricReporter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to seatunnel-metrics-prometheus

builder.append("helpString: ")
.append(this.helpString)
.append(lineSeparator);
for(int i=0;i<this.dimensionKeys.size();i++){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code need be formated.

jobName = config.getString("jobName","flinkJob");
//config.
//String string = metricConfig.getString("name", "de");
//log.info("StreamMetricReporter init:{}", string);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove useless code.

* exports Flink metrics to Seatunnel
*/
public class SeatunnelMetricReporter extends AbstractSeatunnelReporter implements Scheduled {
private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SeatunnelMetricReporter.class);

lvlv-feifei and others added 7 commits September 26, 2022 15:58
* [Improve][DOC] Perfect the connector v2 doc

* Update seatunnel-connectors-v2/README.zh.md

Co-authored-by: Hisoka <[email protected]>

* [Improve][DOC] A little tinkering

* [Improve][DOC] A little tinkering

* [Doc][connector] add Console sink doc

close apache#2794

* [Doc][connector] add Console sink doc

close apache#2794

* fix some problem

* fix some problem

* fine tuning

Co-authored-by: Hisoka <[email protected]>
@CalvinKirs CalvinKirs added the feature New feature label Sep 26, 2022
@ashulin ashulin marked this pull request as draft September 26, 2022 18:05
Copy link
Member

@ashulin ashulin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I have a different idea about Metrics.
For various engines, they already have metrics and metric reporters, and engine users have also managed metrics based on their metrics reporters.
Seatunnel just submits an engine job, and it is unreasonable for users to do additional metrics management.
My opinion is that seatunnel provides unified metrics for the connector-v2 api, and then adds the metrics of seatunnel to the existing reporter of the engine.
For older versions of the connector, the engine's own metrics are used directly.
The metrics reporter of seatunnel may only be used for the engine of seatunnel itself.

@Hisoka-X
Copy link
Member

My opinion is that seatunnel provides unified metrics for the connector-v2 api, and then adds the metrics of seatunnel to the existing reporter of the engine.

This is a feature will do next.

For older versions of the connector, the engine's own metrics are used directly.

There alway can use engine own metrics both old version connector and connector-v2

The metrics reporter of seatunnel may only be used for the engine of seatunnel itself.

At now, our reporter can support flink/spark/seatunnel with a little work. It is not conflict with their own reporter.

histogramsIndex.put(new SimpleHistogram(key.getCount(), key.getStatistics().getMin(), key.getStatistics().getMax(), key.getStatistics().getStdDev(), key.getStatistics().getMean(), quantile), metric.getValue());
}
//todo handle user config
reporter = new PrometheusPushGatewayReporter(jobName, host, port);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved outside the SeatunnelMetricReporter#report() method?

@Override
public PrometheusPushGatewayReporter open() {
//todo Handle user config
return new PrometheusPushGatewayReporter("flink_prometheus_job", "localhost", DEFAULT_PORT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create PrometheusPushGatewayReporter object again?


/** Reporters are used to export seatunnel {@link Metric Metrics} to an external backend. */
public interface MetricReporter {
MetricReporter open();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return MetricReporter object? this is object method

}),
newMetricInfo(metricName, dimensionKeys, dimensionValues))
}
val reporter = new PrometheusPushGatewayReporter(pollJobName, pollHost, pollPort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

@hailin0
Copy link
Member

hailin0 commented Sep 28, 2022

check file license header

# Conflicts:
#	seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf
2. Use reflection to automate assembly
3. Modify the flink/spark startup function
4. Try packaging configuration (todo)
@Hisoka-X Hisoka-X changed the base branch from dev to st-metrics October 20, 2022 02:17
@Hisoka-X Hisoka-X marked this pull request as ready for review October 20, 2022 02:19
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Hisoka-X
Copy link
Member

We merge it into st-metrics branch, should have long term to develop it before ready.

@Hisoka-X Hisoka-X merged commit 0567e2a into apache:st-metrics Oct 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants