Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few minor fixes #68

Merged
merged 5 commits into from
Aug 8, 2019
Merged

Conversation

yuchaoran2011
Copy link
Contributor

If we re-configure the CR object of a running FlinkApp by adding more resources (e.g. increasing parallelism or the number of task slots), then a new Flink cluster will be started with the new configurations. But a bug in the operator prevented the old cluster from being deleted. This PR fixed this bug.

if err != nil {
logger.Warn(ctx, "Failed to clean up old resources: %v", err)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These several lines take care of deleting the old cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuchaoran2011 This does not look right.

The cluster is deleted in the Running state.

The sequence is Running - Updating to Savepointing to ClusterStarting - SubmittingJob and back to Running in the new cluster.

Only after the job is running in the new cluster the old cluster is deleted. This is done in handleApplicationRunning. Let me know if you are noticing something else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what I observed though. What I saw was that when the application is running, after I apply some changes to the CR, then a new cluster is started. The application then starts running in the new cluster, but the old cluster is still around/not deleted. You should be able to reproduce it by doing what I described.

Copy link
Contributor

@anandswaminathan anandswaminathan Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have integration tests to verify. Wondering if we missed an edge case. Can you paste the status/output of the application resource, and deployments?

This is the code for deletion - https://github.com/lyft/flinkk8soperator/blob/master/pkg/controller/flinkapplication/flink_state_machine.go#L545

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you meant now. You got a valid point. Let me check with my colleague who reported the issue to me. If his application did not successfully savepoint, then the old cluster not being deleted by the operator is expected. Only if his application did successfully savepoint, then is it an operator problem. I'll get back to you when I hear from him.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to use the new tag of the image as mentioned above .. getting an exception starting Task Managers ..

2019-08-07 06:16:26,061 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - TaskManager initialization failed.
java.lang.Exception: Could not create actor system
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:267)
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:153)
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:112)
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:87)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRpcService(AkkaRpcServiceUtils.java:84)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.createRpcService(TaskManagerRunner.java:412)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:134)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:302)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:299)
	at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:299)
Caused by: com.typesafe.config.ConfigException$Parse: String: 55: Expecting a value but got wrong token: 'h' ('$' not followed by {, 'h' not allowed after '$') (if you intended 'h' ('$' not followed by {, 'h' not allowed after '$') to be part of a key or string value, try enclosing the key or value in double quotes, or you may be able to rename the file .properties rather than .conf)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseError(ConfigDocumentParser.java:201)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseError(ConfigDocumentParser.java:197)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:251)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:475)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:458)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:458)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:458)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:458)
	at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parse(ConfigDocumentParser.java:648)
	at com.typesafe.config.impl.ConfigDocumentParser.parse(ConfigDocumentParser.java:14)
	at com.typesafe.config.impl.Parseable.rawParseValue(Parseable.java:262)
	at com.typesafe.config.impl.Parseable.rawParseValue(Parseable.java:250)
	at com.typesafe.config.impl.Parseable.parseValue(Parseable.java:180)
	at com.typesafe.config.impl.Parseable.parseValue(Parseable.java:174)
	at com.typesafe.config.impl.Parseable.parse(Parseable.java:301)
	at com.typesafe.config.ConfigFactory.parseString(ConfigFactory.java:1051)
	at com.typesafe.config.ConfigFactory.parseString(ConfigFactory.java:1061)
	at org.apache.flink.runtime.akka.AkkaUtils$.getRemoteAkkaConfig(AkkaUtils.scala:601)
	at org.apache.flink.runtime.akka.AkkaUtils$.getAkkaConfig(AkkaUtils.scala:221)
	at org.apache.flink.runtime.akka.AkkaUtils.getAkkaConfig(AkkaUtils.scala)
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:247)
	... 11 more

Incidentally I get the same error with image tag 0.1.3, which I tried just now. We had been using 0.1.2 which does not give this exception.

By the way we run Flink 1.8 ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update the Docker image. Check this - 373cb21

We recently pushed a backward incompatible change.

Copy link

@debasishg debasishg Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah .. I changed the image in flinkk8soperator.yaml .. do I need to do anything else ?

containers:
      - name: flinkoperator-gojson
        image: docker.io/lyft/flinkk8soperator:500fe6bd40da8efca4a48bbb1104896be2c1fae8

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going through the change .. in lines 186 and 192 of pkg/controller/flink/container_utils.go ..

did u mean $HOST_IP or should they be ${?HOST_IP} if they are meant to be picked up for processing by Akka config in Flink ?

My hunch was purely based on the error that we face where it complains about $ being followed by h without a { ..

Copy link
Contributor

@anandswaminathan anandswaminathan Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not referring to the flink operator image. This is the application image.

You will need to update your application image to be able to run with the latest version of the operator. Check the changes here - 373cb21

You need to pick the latest changes here - https://github.com/lyft/flinkk8soperator/blob/master/examples/wordcount/docker-entrypoint.sh - We added a bunch of new env variables relating to bug fix. These variables need to be substituted with appropriate values.

@debasishg
Copy link

debasishg commented Aug 7, 2019 via email

@anandswaminathan
Copy link
Contributor

anandswaminathan commented Aug 8, 2019

@debasishg @yuchaoran2011

Check #69 for resolution on the "hash collision" issue. (You need to add the default values as part of your application for Volumes)

@debasishg
Copy link

debasishg commented Aug 8, 2019

Thanks @anandswaminathan .. after adding the defaults looks like it progresses. But now I get this error ..

{"json":{"app_name":"taxi-ride-fare-processor","ns":"taxi-ride-fare","phase":"SubmittingJob"},"level":"info","msg":"Handling state for application","ts":"2019-08-08T06:38:46Z"}
{"json":{"app_name":"taxi-ride-fare-processor","ns":"taxi-ride-fare","phase":"SubmittingJob"},"level":"info","msg":"No active job found for the application []","ts":"2019-08-08T06:38:46Z"}
{"json":{"app_name":"taxi-ride-fare-processor","ns":"taxi-ride-fare","phase":"SubmittingJob"},"level":"warning","msg":"Job submission failed with response {\"errors\":[\"Jar file /tmp/flink-web-c84df002-a203-41a6-9f1d-6731a898241d/flink-web-upload/runner.jar does not exist\"]}","ts":"2019-08-08T06:38:46Z"}
{"json":{"app_name":"taxi-ride-fare-processor","ns":"taxi-ride-fare","phase":"SubmittingJob"},"level":"info","msg":"Logged Warning event: JobSubmissionFailed: Failed to submit job to cluster for deploy 25b8ff55: SubmitJob call failed with status 400 Bad Request and message [{\"errors\":[\"Jar file /tmp/flink-web-c84df002-a203-41a6-9f1d-6731a898241d/flink-web-upload/runner.jar does not exist\"]}]","ts":"2019-08-08T06:38:46Z"}
{"json":{"app_name":"taxi-ride-fare-processor","ns":"taxi-ride-fare","phase":"SubmittingJob"},"level":"warning","msg":"Failed to reconcile resource taxi-ride-fare/taxi-ride-fare-processor: SubmitJob call failed with status 400 Bad Request and message [{\"errors\":[\"Jar file /tmp/flink-web-c84df002-a203-41a6-9f1d-6731a898241d/flink-web-upload/runner.jar does not exist\"]}]","ts":"2019-08-08T06:38:46Z"}

It says Handling state which is good but why does it look for the jar under /tmp? I copied the jar under /opt/flink/flink-web-upload and I confirmed in the image it's there.

@debasishg
Copy link

@anandswaminathan BTW this happens when I try to scale the application with one more Task Manager

@debasishg
Copy link

@anandswaminathan ah .. got what the problem was .. web.upload.dir was not getting set. Now with scaling it kills the older pods and the new ones are running. Thanks! /cc @yuchaoran2011

@yuchaoran2011 yuchaoran2011 changed the title Fixed bug where the old cluster is not terminated A few minor fixes Aug 8, 2019
@yuchaoran2011
Copy link
Contributor Author

Thanks @anandswaminathan and @mwylde for the help! Now that @debasishg has everything working, I've updated my PR to remove stuff that shouldn't be there. Now this PR has been reduced to a trivial one. Feel free to merge.

@anandswaminathan anandswaminathan merged commit bdd8996 into lyft:master Aug 8, 2019
@yuchaoran2011 yuchaoran2011 deleted the fixDeleteCluster branch August 20, 2019 20:59
dedibit66 pushed a commit to dedibit66/flinkk8soperator that referenced this pull request Sep 24, 2019
Reverts lyft/flinkk8soperator#68

The Operator was not listening to events from CRD, as the default pflags was passing in "*". To make operator listen to all namespaces, just pass in empty string "".

We have check to log "*" for namespace instead of empty for readability purposes. Added comment for clarity.

- Cleaning up Lyft examples as we have examples outside.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants