-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Backward Incompatible] Add env variables for Flink managers and fix for JM HA #45
Changes from 6 commits
5bb866f
ddbf570
b1b3cb8
7f4c81e
74a00c7
e648b81
a2109cb
140704f
0e96ca9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,9 @@ const ( | |
AwsMetadataServiceTimeout = "5" | ||
AwsMetadataServiceNumAttempts = "20" | ||
OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG" | ||
HostName = "HOST_NAME" | ||
HostIP = "HOST_IP" | ||
FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE" | ||
FlinkDeploymentType = "flink-deployment-type" | ||
FlinkDeploymentTypeJobmanager = "jobmanager" | ||
FlinkDeploymentTypeTaskmanager = "taskmanager" | ||
|
@@ -87,6 +90,22 @@ func getFlinkEnv(app *v1alpha1.FlinkApplication) ([]v1.EnvVar, error) { | |
Name: OperatorFlinkConfig, | ||
Value: flinkConfig, | ||
}, | ||
{ | ||
Name: HostName, | ||
ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "metadata.name", | ||
}, | ||
}, | ||
}, | ||
{ | ||
Name: HostIP, | ||
ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "status.podIP", | ||
}, | ||
}, | ||
}, | ||
}...) | ||
return env, nil | ||
} | ||
|
@@ -155,14 +174,21 @@ func HashForApplication(app *v1alpha1.FlinkApplication) string { | |
return fmt.Sprintf("%08x", hasher.Sum32()) | ||
} | ||
|
||
func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string) { | ||
func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string, deploymentType string) { | ||
var newContainers []v1.Container | ||
for _, container := range deployment.Spec.Template.Spec.Containers { | ||
var newEnv []v1.EnvVar | ||
for _, env := range container.Env { | ||
if env.Name == OperatorFlinkConfig { | ||
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash) | ||
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash)) | ||
if getJobmanagerReplicas(app) == 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like this check should be whether HA is enabled, not how many JMs there are. If HA is enabled we probably want to use it for discovery even if we use a single JM (this is the configuration we currently run in production). You can tell HA is enabled by looking for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash)) | ||
} else if deploymentType == FlinkDeploymentTypeJobmanager { | ||
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: $HOST_IP\n", env.Value) | ||
} | ||
if deploymentType == FlinkDeploymentTypeTaskmanager { | ||
env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value) | ||
} | ||
} | ||
newEnv = append(newEnv, env) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ const ( | |
JobManagerPodNameFormat = "%s-%s-jm-pod" | ||
JobManagerContainerName = "jobmanager" | ||
JobManagerArg = "jobmanager" | ||
JobManagerReadinessPath = "/config" | ||
JobManagerReadinessPath = "/overview" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that in the newer Flink versions, both the Jobmanagers - leader and follower respond to API requests. I noticed that JM responds There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like both the leader and follower in the latest JMs is able to respond to REST requests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's what I'm seeing as well. I think that's actually fine, the follower should redirect the request to the leader. |
||
JobManagerReadinessInitialDelaySec = 10 | ||
JobManagerReadinessTimeoutSec = 1 | ||
JobManagerReadinessSuccessThreshold = 1 | ||
|
@@ -247,6 +247,10 @@ func FetchJobManagerContainerObj(application *v1alpha1.FlinkApplication) *coreV1 | |
|
||
ports := getJobManagerPorts(application) | ||
operatorEnv := GetFlinkContainerEnv(application) | ||
operatorEnv = append(operatorEnv, coreV1.EnvVar{ | ||
Name: FlinkDeploymentTypeEnv, | ||
Value: FlinkDeploymentTypeJobmanager, | ||
}) | ||
operatorEnv = append(operatorEnv, jmConfig.Environment.Env...) | ||
|
||
return &coreV1.Container{ | ||
|
@@ -342,7 +346,7 @@ func FetchJobMangerDeploymentCreateObj(app *v1alpha1.FlinkApplication, hash stri | |
template.Spec.Selector.MatchLabels[FlinkAppHash] = hash | ||
template.Spec.Template.Name = getJobManagerPodName(app, hash) | ||
|
||
InjectHashesIntoConfig(template, app, hash) | ||
InjectOperatorCustomizedConfig(template, app, hash, FlinkDeploymentTypeJobmanager) | ||
|
||
return template | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to do
And remove that logic from the operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference is to have it in the operator (as you've done), as this gives us more flexibility (the operator is much easier to change than the images).