-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monitor Job Vertices State on Deploy #284
Conversation
jobFinalizer = "job.finalizers.flink.k8s.io" | ||
statusChanged = true | ||
statusUnchanged = false | ||
jobVertexStateTimeoutInMinute = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be part of the config so that it can be set through configmap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it could be. But I think even we set it through configmap, we still need a default value in code in case that this key is not set in configMap. If we have the use case that flinkk8soperator user want to override the default value later, we can consider to add it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. You can add the default value as well.
package config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to adding this as a configuration for the user. Any arbitrary waits should be configurable, even for our own sake of finding the optimal time to ensure a safe deploy but decrease deploy times.
Additionally, I like the "duration" syntax used in the configmap linked above as it does not restrict to minutes which is less flexible
if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { | ||
s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) | ||
// wait until all vertices have been scheduled and running | ||
jobStartTimeSec := job.StartTime / 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the job start time reliable ? I believe this will get reset when the jm restarts or job fails and restarts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point. Let me verify it tmr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have verified that the start time will not reset after job restart.
{"jobs":[{"jid":"70ec861c2ddf20072a1d2e3f1aff3bd7","name":"nmodes","state":"RESTARTING","start-time":1680713677111,"end-time":-1,"duration":1235989,"last-modification":1680714880571,"tasks":{"total":192,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":191,"failed":1,"reconciling":0,"initializing":0}}]}
The job is in RESTARTING state, but start-time
is Wednesday, April 5, 2023 9:54:37.111 AM GMT-07:00 DST, which is the time I triggered the deployment.
ptal @premsantosh @maghamravi #streaming-compute-dev-prs |
ptal @premsantosh @maghamravi #streaming-compute-dev-prs |
nit: Can you make the PR title a bit more descriptive |
jobFinalizer = "job.finalizers.flink.k8s.io" | ||
statusChanged = true | ||
statusUnchanged = false | ||
jobVertexStateTimeoutInMinute = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to adding this as a configuration for the user. Any arbitrary waits should be configurable, even for our own sake of finding the optimal time to ensure a safe deploy but decrease deploy times.
Additionally, I like the "duration" syntax used in the configmap linked above as it does not restrict to minutes which is less flexible
for _, v := range job.Vertices { | ||
allVerticesStarted = allVerticesStarted && (v.StartTime > 0) | ||
allVerticesRunning := true | ||
if app.Spec.BetaFeaturesEnabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we documenting what features are in beta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a standard operator pattern.
Primarily because, this flag getting updated will cause applications to restart (if you consider hash calculation to include this flag)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sethsaperstein-lyft I will add more to crd.md file for documenting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anandswaminathan sorry, I didn't follow that why if the flag is updated, the application to restart. Can you elaborate that? My understanding is that this field is set in applicaiton jsonnet file and the value is passed in when new deployment/update is made. But this field can't be changed in runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here is what you are doing - you are looking for a way to have some applications be exposed to this feature.
This is not commonly done in CRD spec and operator, as this is not something you want to have for the long run. There is no direct correlation between Flink and "BetaFeaturesEnabled". You are now setting a goal that all the apps will have "BetaFeaturesEnabled" set to True. Say you remove the flag some day later (to make the behavior default), all the apps that have this set need to revert, and we are looking at updating 100s of apps (depending on scale).
If you are worried about the reliability of the change, may be pass a version as label (if we already don't do so), and have logic on the label (if version == xx). The other option is have flag in the label (but that needs a small change in here). That way, if the code gets removed, or changed, the apps will not need to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for record, Anand and I had a discussion and our conclusion is that this is not breaking changes, and I will do several testing in staging after merging, definitely with an example of failure scenario nmodes
.
For future benefit if we need a feature flag, as Anand pointed out above, the recommended way is to check label instead of adding a new field in CRD. From app jsonnet file, just add the label if we need to turn on the feature.
if app.Spec.BetaFeaturesEnabled { | ||
// wait until all vertices have been scheduled and running | ||
logger.Info(ctx, "Beta features flag is enabled.") | ||
jobStartTimeSec := job.StartTime / 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we throw the below logic in a separate function to make this more readable and testable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have clarified that I meant all the new logic of the running vertex state check, not just the start time calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After removing the feature flag if condition, I think it's readable now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func is >100 lines. I think it's somewhat difficult to read. Nit, but for maintainability
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", | ||
fmt.Sprintf( | ||
"Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name)) | ||
return s.deployFailed(app) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new behavior. Is it intentional ? More than monitoring, we are updating the state of the deployment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is not just monitoring but also updating state of deployment when conditions are met.
This specific fail fast part is new and it's based on general philosophy that if fails, fail fast.
I double checked Flink doc about the vertex state transition graph, FAILING
could retry while FAILED
is the final bad state.
So I will update the logic to if any vertex state is FAILED
, fail deployment fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anandswaminathan in retrospect this behavior did not follow the original design as we would like to move to rollingback. New PR: #291
ptal @anandswaminathan |
ptal @anandswaminathan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please get a +1 from @sethsaperstein-lyft
hasFailure = true | ||
break | ||
} | ||
allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the first vertex is failing then won't this return allVerticesRunning
as true
when it should be false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the vertex is failing, hasFailure
is set to true, then line 768 if block will have early return to fail the deployment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So its working because of how the code in the current caller is written but essentially this method is still buggy because if its called from somewhere else where the case is not handled like the above caller it will show incorrect result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Initially this part is in the main method, so I didn't set value for allVerticesRunning
in fail scenario. I will provide the fix later
Checking.
…On Mon, May 15, 2023 at 5:24 PM Seth Saperstein ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pkg/controller/flinkapplication/flink_state_machine.go
<#284 (comment)>
:
> + hasFailure := false
+ failedVertexIndex := -1
+ for index, v := range job.Vertices {
+ if v.Status == client.Failed || v.Status == client.Failing {
+ failedVertexIndex = index
+ hasFailure = true
+ break
+ }
+ allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running
+ }
+ // fail fast
+ if hasFailure {
+ s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed",
+ fmt.Sprintf(
+ "Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name))
+ return s.deployFailed(app)
@anandswaminathan <https://github.com/anandswaminathan> in retrospect
this behavior did not follow the original design as we would like to move
to rollingback. New PR: #291
<#291>
—
Reply to this email directly, view it on GitHub
<#284 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AEMOGLPSTRT6Y5CVGCFAZE3XGLCM5ANCNFSM6AAAAAAWPBFI2Q>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
## overview In the [job monitoring PR](#284) we introduced a bug such that when the job monitoring fails due to timeout or a failed vertex, the state DeployFailed is reached instead of attempting to rollback. This simplifies the logic of submitting job and job monitoring as well as results in the job attempting to roll back ## additional info Errors returned by a state in the state machine are added to the status as the last error. The shouldRollback at the beginning of these states checks to see if it is retryable and moves to rolling back if not. Thus, the change made is to return an error if monitoring results in a failed vertex or vertex timeout
Context
Currently flinkk8soperator only monitor Flink job level overview status, and if the job status is RUNNING, it considers the deployment is succeeded.
However we encountered scenarios that some vertices are in bad states, the job keeps crash and restart. So the change is also check all vertices status.
Design doc
Implementation
Use a feature flag to control the feature ON/OFF. Initially default value is false. After we have enough confidence, we will flip the flag or remove the feature flag.As discussed with Anand, TLDR is this change is relatively small and I will do handful manual testing in staging after merge.FAILED
.,
inprogramArgs
and assume it will break args parsing. However, currently test flink app is pinned to a specific old image. I tried to build a new flink image, but new image exceeded memory limit.Jira link
https://jira.lyft.net/browse/STRMCMP-1640