Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backward Incompatible] Add env variables for Flink managers and fix for JM HA #45

Merged
merged 9 commits into from
Jul 17, 2019

Conversation

anandswaminathan
Copy link
Contributor

We want the ability to assign some of the Flink config values dynamically based on values from host name, Ip, etc.

From an overall observation, currently both the operator and the image understands that jobmanager and task manager behave differently. In this particular context, we want the "jobmanager.rpc.address" to be pod name on the Job manager and service name on the task manager.

I iterated across multiple options and arrived at this. Also update the Dockerfile in examples and test to reflect the same. The changes below can be simpler and all the responsibility of setting correct values can be delegated to the image. But I believe since operator is already ironing lots of things out and setting things correctly, I moved some of the logic to the operator. But I understand this is subjective and is up for discussion. Happy to take comments and change the logic.

cc @mwylde @glaksh100

if [ -n "$TASKMANAGER_HOSTNAME" ]; then
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to do

+if [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
+    echo "jobmanager.rpc.address: $HOST_NAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
+fi
+
 # As the taskmanager pods are accessible only by (cluster) ip address,
 # we must manually configure this based on the podIp kubernetes
+if [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
+    echo "taskmanager.host: $HOST_IP" >> "$FLINK_HOME/conf/flink-conf.yaml"
 fi

And remove that logic from the operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to have it in the operator (as you've done), as this gives us more flexibility (the operator is much easier to change than the images).

@anandswaminathan
Copy link
Contributor Author

@mwylde @glaksh100 Tested in cluster for both HA and non HA. Things work fine

@@ -24,7 +24,7 @@ const (
JobManagerPodNameFormat = "%s-%s-jm-pod"
JobManagerContainerName = "jobmanager"
JobManagerArg = "jobmanager"
JobManagerReadinessPath = "/config"
JobManagerReadinessPath = "/overview"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in the newer Flink versions, both the Jobmanagers - leader and follower respond to API requests.

I noticed that JM responds /config even though it is not able to process other requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like both the leader and follower in the latest JMs is able to respond to REST requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I'm seeing as well. I think that's actually fine, the follower should redirect the request to the leader.

var newContainers []v1.Container
for _, container := range deployment.Spec.Template.Spec.Containers {
var newEnv []v1.EnvVar
for _, env := range container.Env {
if env.Name == OperatorFlinkConfig {
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
if getJobmanagerReplicas(app) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this check should be whether HA is enabled, not how many JMs there are. If HA is enabled we probably want to use it for discovery even if we use a single JM (this is the configuration we currently run in production).

You can tell HA is enabled by looking for the high-availability key in the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@anandswaminathan
Copy link
Contributor Author

Note that after this change, you will need to update the underlying image. Have updated the example and test as well.

cc @YuvalItzchakov @yuchaoran2011

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants