Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay. #31944

Closed
wants to merge 8 commits into from

Conversation

yijiacui-db
Copy link
Contributor

@yijiacui-db yijiacui-db commented Mar 23, 2021

What changes were proposed in this pull request?

This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest.

A public interface is added.

metrics: returns the metrics reported by the streaming source with given offset.

Why are the changes needed?

The new API can expose any custom metrics for the "current" offset for streaming sources. Different from #31398, this PR makes metrics available to user through progress report, not through spark UI. A use case is that people want to know how the current offset falls behind the latest offset.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test for Kafka micro batch source v2 are added to test the Kafka use case.

@viirya
Copy link
Member

viirya commented Mar 23, 2021

hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. It was merged recently. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.

@yijiacui-db
Copy link
Contributor Author

yijiacui-db commented Mar 23, 2021

hi @yijiacui-db, thanks for working on this. I'm still working on this and related API. The community has an agreement (SPARK-34366, PR #31476) on metric API for data source. Next step I will implement the API at data source including kafka data source. If you are interested in this work, welcome to help review the work later. Thanks.

Hi @viirya, this pull request exposes the backlog metrics to end-user through streaming query progress, not the metrics in the Spark UI. I think it makes sense to combine both of them to expose metrics at different levels. Sorry I used your jira number because I thought that's the jira number I should use. I can make another jira to track this work if you perfer.

@viirya
Copy link
Member

viirya commented Mar 23, 2021

For different purpose, I think it is better to create another JIRA. Thanks.

@xuanyuanking
Copy link
Member

xuanyuanking commented Mar 24, 2021

Agree with @viirya on creating another JIRA. Let's link these two tickets together.
IMO, we should have the metrics both in UI and progress reporter. We can do the follow-ups to combine the code (e.g., the metrics collection part).

Also cc @HeartSaVioR @gaborgsomogyi @bozhang2820 @Ngone51 @zsxwing

@yijiacui-db
Copy link
Contributor Author

Agree with @viirya on creating another JIRA. Let's link these two tickets together.
IMO, we should have the metrics both in UI and progress reporter. We can do the follow-ups to combine the code (e.g., the metrics collection part).

Also cc @HeartSaVioR @gaborgsomogyi @bozhang2820 @Ngone51 @zsxwing

@viirya @xuanyuanking Thanks. I opened this SPARK-34854 to track the work.

@yijiacui-db yijiacui-db deleted the SPARK-34297 branch March 24, 2021 07:51
@xuanyuanking
Copy link
Member

It's OK to keep this PR open, just change the title to the new Jira number. :) @yijiacui-db

@yijiacui-db yijiacui-db changed the title [SPARK-34297][SQL] Expose source metrics API and Add Kafka metrics to report delay. [SPARK-34854][SQL][SS] Expose source metrics API and Add Kafka metrics to report delay. Mar 24, 2021
@yijiacui-db yijiacui-db restored the SPARK-34297 branch March 24, 2021 07:53
@yijiacui-db yijiacui-db reopened this Mar 24, 2021
@yijiacui-db yijiacui-db changed the title [SPARK-34854][SQL][SS] Expose source metrics API and Add Kafka metrics to report delay. [SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay. Mar 24, 2021
@yijiacui-db
Copy link
Contributor Author

It's OK to keep this PR open, just change the title to the new Jira number. :) @yijiacui-db

@xuanyuanking Thanks! I reopened it and changed the jira # and also descriptions.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have recorded start offset, end offset and latest offset in SourceProgress. Looks like the metrics here are somehow duplicate?

@HeartSaVioR
Copy link
Contributor

Yeah I guess it's just a matter of parsing and calculation on user end.

  • Do you have specific use case leveraging this information?
  • Are you planning to integrate this information to Spark UI or somewhere?
  • Could you please try out recent version of Spark and check the available information on SourceProgress, and see whether it could solve the same use case despite of some more calculation?

@gaborgsomogyi
Copy link
Contributor

  • Do you have specific use case leveraging this information?

+1 on @HeartSaVioR comment and would be good to see the big picture before we go into the details.
Related UI it worth to create a separate Jira.

@yijiacui-db
Copy link
Contributor Author

yijiacui-db commented Mar 24, 2021

Yeah I guess it's just a matter of parsing and calculation on user end.

  • Do you have specific use case leveraging this information?
  • Are you planning to integrate this information to Spark UI or somewhere?
  • Could you please try out recent version of Spark and check the available information on SourceProgress, and see whether it could solve the same use case despite of some more calculation?

@viirya @HeartSaVioR I don't think that's a duplicated information in source progress. The information recorded in the source progress now is the latest consumed offset by the stream, not the latest offset available in the source. Take Kafka as an example, we can have read limit while consuming the offsets, so we can only consume some certain number of offset, but the available data in kafka is more than that. That can be applied to all the other streaming sources too. There are some users want to know whether they fall behind through the listener and want to adjust the cluster size accordingly.

I don't think the current spark progress can calculate the information as i mentioned above, because the latest offset available information is internal for the source, there's no way to know that with the current source progress.

I didn't have a plan for integrating this information with spark UI. That's something I can work on after @viirya 's PR is merged in. I can refactor and adjust accordingly to see whether this metrics information can be exposed through spark UI too.

@viirya
Copy link
Member

viirya commented Mar 24, 2021

The latest offset in source progress is the latest offset available in the source, not the latest consumed offset by the stream.

@yijiacui-db
Copy link
Contributor Author

yijiacui-db commented Mar 24, 2021

The latest offset in source progress is the latest offset available in the source, not the latest consumed offset by the stream.

@viirya That's a good point. I referred to the latest consumed offset used in metrics method, without realizing that latestOffset available is reported by Kafka through reportLatestOffset. While implementing this metrics interface, it's more general for sources that don't implement reportLatestOffset, so that they can do some computation based on the consumed offset and report stats back.

It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset. @zsxwing @tdas Do you think that we should remove kafka source use case because it's kind of duplicated? And if so, do we still want to merge the metrics api only to apache/spark?

@HeartSaVioR
Copy link
Contributor

What more metrics you have in mind? I see the flexibility on adding some sort of custom metrics, but want to make sure there're clear actual use cases, as we're adding public API here.

@HeartSaVioR
Copy link
Contributor

It's not only true for Kafka source. We'd need to think like this it can be true for all data sources migrated to Spark 3.2.0, as the API is available.

That said, if we only imagine the use case of the new API as the same purpose from #30988, I feel that is duplicated. That's the reason I'd like to see more use cases.

@xuanyuanking
Copy link
Member

xuanyuanking commented Mar 25, 2021

Besides the flexibility, I can see the new API here can help with the expansibility for the progress reporter. If new customized metrics needed in the future, we don't need to change the top-level for the output JSON string instead of new fields in the map-like field. It also gave us the possibility to customize metrics for different SparkDataSource.

It's definitely true that for Kafka source, this api isn't that necessary because of that reported latest offset.

Yes. Maybe you can also move the metrics added in #30988 to your new implementation. Either way is OK for me. cc @viirya for more opinions.

Agree with Gabor and Juntaek, here we need to provide more use cases for the new API. Besides the metrics added for Kafka source in this PR, from what I'm thinking, we can use this new API to expose more customized metrics for FileStreamSource only. E.g., the files or number of bytes outstanding.

Of cause, end-users can implement this in their customized SparkDataStream. Maybe @yijiacui-db can provide more use cases.

@gaborgsomogyi
Copy link
Contributor

Take Kafka as an example, we can have read limit while consuming the offsets, so we can only consume some certain number of offset, but the available data in kafka is more than that. That can be applied to all the other streaming sources too. There are some users want to know whether they fall behind through the listener and want to adjust the cluster size accordingly.

If I understand correctly it's planned to monitor whether Spark is behind in Kafka processing. If that's true there is an existing solution for this which works like charm. The user can commit the offsets back to Kafka with a listener and the delta between available and committed offsets can be monitored. If this would be the only use-case then not 100% sure it worth the ~300 lines change set in the Kafka part.

I would like to emphasize I'm not against to make this better, just would be good to see a bit more from use-case perspective.

@gaborgsomogyi
Copy link
Contributor

FYI, there was a PR to commit back the offsets from Spark itself which was not satisfying to everybody needs so was not merged.

@yijiacui-db
Copy link
Contributor Author

@gaborgsomogyi @xuanyuanking We have some customized sources following the user’s requirements. They want to dynamically adjust the size of the cluster based on how far they're falling behind the latest. With this PR, they can be exposed to this metrics through source progress at the end of each batch.

@tdas @zsxwing Feel free to correct me / add more details.

@xuanyuanking
Copy link
Member

They want to dynamically adjust the size of the cluster based on how far they're falling behind the latest. With this PR, they can be exposed to this metrics through source progress at the end of each batch.

I think it's a reasonable use case for me. cc @HeartSaVioR @gaborgsomogyi for their opinions.
I'll take a close look at the code today.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. Let's see the opinion from others.

@gaborgsomogyi
Copy link
Contributor

Dynamic allocation based on load is something which is important to users and I support the direction so I'm fine.
I need some time to test the PR on cluster though...

@gaborgsomogyi
Copy link
Contributor

@viirya please make judgement how fits into the whole picture since I'm not involved in the metrics API development.

@viirya
Copy link
Member

viirya commented Apr 28, 2021

I've tested it on real cluster and works fine.
Just a question. How this it intended to use for dynamic allocation?

Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.

This is a valid user-case. But my question is that current offsets in SourceProgress should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?

@HeartSaVioR
Copy link
Contributor

I've tested it on real cluster and works fine.
Just a question. How this it intended to use for dynamic allocation?

Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.

This is a valid user-case. But my question is that current offsets in SourceProgress should already provide the information the use-case needs (consumed offset, available offset).

That is what understand as well - that is just a matter of "where" we want to put calculation.

I have mixed feeling of this as:

  1. If the target persona is human, then I'd rather not let them calculate by themselves. It should be helpful to let Spark calculate and provide the information instead.

  2. If the target persona is a "process" (maybe Spark driver or some external app?), then it should not be that hard to calculate by itself.

Not sure which is the actual use case for this PR.

@yijiacui-db
Copy link
Contributor Author

yijiacui-db commented Apr 28, 2021

I've tested it on real cluster and works fine.
Just a question. How this it intended to use for dynamic allocation?

Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.

This is a valid user-case. But my question is that current offsets in SourceProgress should already provide the information the use-case needs (consumed offset, available offset). The source progress should be also available on the customized SparkDataStream. Do you mean the metrics from the customized SparkDataStream is not offset related?

Yes. Available offset is retrieved through reportLatestOffset, that's something Kafka already implemented, so that's duplicated because we can use the latest consumed offset and also the available offset to compute how far is falling behind.
But, for other customized spark data stream, it's possible that reportLatestOffset isn't implemented, so from the source progress report, there's no way to know the latest available offset to do the computation. Also, the customized metrics, for example, how far the application is falling behind from the latest, can be represented in other ways (not only in the number of offsets), which all depends on the how the stream defines it.

We want to introduce this metrics interface to let user implement for their data stream to obtain the metrics they want from the source progress report. Kafka Stream is just an example of how users can implement this and retrieve that information, but it happens to have the latest available offset to make it look a little bit duplicated and hard to reason about.

@yijiacui-db
Copy link
Contributor Author

I've tested it on real cluster and works fine.
Just a question. How this it intended to use for dynamic allocation?

Users can implement this interface in their customized SparkDataStream and know how far falling behind through the progress listener. Maybe this can provide more useful information to guide/trigger the auto scaling.

This is a valid user-case. But my question is that current offsets in SourceProgress should already provide the information the use-case needs (consumed offset, available offset).

That is what understand as well - that is just a matter of "where" we want to put calculation.

I have mixed feeling of this as:

  1. If the target persona is human, then I'd rather not let them calculate by themselves. It should be helpful to let Spark calculate and provide the information instead.
  2. If the target persona is a "process" (maybe Spark driver or some external app?), then it should not be that hard to calculate by itself.

Not sure which is the actual use case for this PR.

@HeartSaVioR This is a good question! I already updated my answer in the comment above, for how it works, and why we need this metrics interface. No matter whether the target persona is human or process, it's always possible that what is available as the latest is something internal to the customized spark data stream and can't be reported as offset, so it's not possible to calculate metrics using offsets and report them as offsets.

@HeartSaVioR
Copy link
Contributor

Yeah I agree about the rationalization and benefits of "adding public API on custom source metrics", though it'd be even better if we could talk with real case which is not covered by #30988.

I feel that the reason the review gets dragging is due to Kafka use-case. Your explanation may make sense on "other" data source (hypothetically, as you haven't provided actual one), but for Kafka case it's possible for specific process to calculate lag with the change of #30988. I agree it's bad for human being to calculate the lag per topic partition and summarize by him/herself, but it's still not that hard for specific process to do that.

@yijiacui-db
Copy link
Contributor Author

yijiacui-db commented Apr 29, 2021

Yeah I agree about the rationalization and benefits of "adding public API on custom source metrics", though it'd be even better if we could talk with real case which is not covered by #30988.

I feel that the reason the review gets dragging is due to Kafka use-case. Your explanation may make sense on "other" data source (hypothetically, as you haven't provided actual one), but for Kafka case it's possible for specific process to calculate lag with the change of #30988. I agree it's bad for human being to calculate the lag per topic partition and summarize by him/herself, but it's still not that hard for specific process to do that.

@viirya @HeartSaVioR

A good example is FileStreamSource, which doesn't implement the reportLatestOffset, because the latest available source isn't matched with the "Offset" representation in the Spark streaming.

In FileStreamSource, fetchMaxOffsests returns the maximum offset that can be retrieved from the source, which can be rate limited. Only the file source itself knows internally that how many files are left to be processed for the batch. Possible metrics here to be exposed to the users is the number of files, and the number of bytes remaining in the batch to be processed, which is how far the application is falling behind the stream. Those metrics can't be computed through the current information in source progress report, so we need the metrics api to expose metrics that can only be computed internally to users.

/**
* Returns [[KafkaSourceOffset]] from a streaming.Offset
*/
def apply(offset: streaming.Offset): KafkaSourceOffset = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to refer non-deprecated one :) I missed that Offset type in above method is deprecated.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 29, 2021

Yes that sounds like a good rationalization for real case. Thanks!

I looked into the changes on API side, and felt both #30988 and this can co-exist. #30988 covers specific cases where latest offset as Offset format can be provided by data source, and this covers more general ("arbitrary" might fit better) cases where the information data source wants to provide is not limited to the latest offset.

For sure, the actual behavioral change in #30988 can be implemented with the API being added here, but providing general output across data sources would be ideally more useful, like plotting to the UI. (I know the technical lack here on making it general as the format of "Offset" is varying across data sources and consumer has to take care.)

For the newly added Kafka metrics, it still makes sense when the target persona is human (convenient to check), but otherwise I agree with @viirya that it sounds like redundant. Despite the fact code change is not huge, probably good to split this down to two PRs with two JIRA issues 1) API changes 2) Kafka metrics, and finalize reviewing 1) first as there seems no outstanding concern on API changes. We can still go with this PR only, if @viirya is OK with adding redundant information.

@HeartSaVioR
Copy link
Contributor

cc. @HyukjinKwon @dongjoon-hyun as they reviewed #30988.

@viirya
Copy link
Member

viirya commented May 3, 2021

Okay, that metrics for FileStreamSource sound an use-case that makes sense. I'm okay you go with this PR.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

As this PR hangs around more than a month, I'll merge this once the test passes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@HeartSaVioR
Copy link
Contributor

@yijiacui-db
Appreciate if you could rerun your Github Action test in your fork. I guess rerunning action here wouldn't retrigger the test on your fork. We don't have good way for others (except author) to retrigger tests automatically yet.

@yijiacui-db
Copy link
Contributor Author

retest this, please

@yijiacui-db
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42684/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42684/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Test build #138163 has finished for PR 31944 at commit 2f13f28.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master!

@HeartSaVioR
Copy link
Contributor

Thanks all for thoughtful reviewing and thanks @yijiacui-db for the contribution! Merged to master.

@yijiacui-db
Copy link
Contributor Author

Thanks all for thoughtful reviewing and thanks @yijiacui-db for the contribution! Merged to master.

@HeartSaVioR @xuanyuanking @gaborgsomogyi @viirya Thank you so much for reviewing this PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants