Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backward Incompatible] Add env variables for Flink managers and fix for JM HA #45

Merged
merged 9 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions examples/wordcount/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ drop_privs_cmd() {
fi
}

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

# As the taskmanager pods are accessible only by (cluster) ip address,
# we must manually configure this based on the podIp kubernetes
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
# operator.
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
fi

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

COMMAND=$@

if [ $# -lt 1 ]; then
Expand All @@ -37,11 +29,11 @@ fi
if [ "$COMMAND" = "help" ]; then
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
exit 0
elif [ "$COMMAND" = "jobmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
echo "Starting Job Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
elif [ "$COMMAND" = "taskmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
Expand Down
4 changes: 2 additions & 2 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ By default the tests create, use, and clean up the namespace
These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
tests currently use two images built from here:

* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1`
* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2`
* `lyft/operator-test-app:ddbf570fe838205040c226807b0285a326a7d4c3.1`
* `lyft/operator-test-app:ddbf570fe838205040c226807b0285a326a7d4c3.2`

Those images are available on our private Dockerhub registry, and you
will either need to pull them locally or give Kubernetes access to the
Expand Down
2 changes: 1 addition & 1 deletion integ/operator-test-app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code

# Configure Flink version
ENV FLINK_VERSION=1.8.0 \
ENV FLINK_VERSION=1.8.1 \
HADOOP_SCALA_VARIANT=scala_2.12

# Install dependencies
Expand Down
18 changes: 5 additions & 13 deletions integ/operator-test-app/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ drop_privs_cmd() {
fi
}

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

# As the taskmanager pods are accessible only by (cluster) ip address,
# we must manually configure this based on the podIp kubernetes
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
# operator.
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to do

+if [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
+    echo "jobmanager.rpc.address: $HOST_NAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
+fi
+
 # As the taskmanager pods are accessible only by (cluster) ip address,
 # we must manually configure this based on the podIp kubernetes
+if [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
+    echo "taskmanager.host: $HOST_IP" >> "$FLINK_HOME/conf/flink-conf.yaml"
 fi

And remove that logic from the operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to have it in the operator (as you've done), as this gives us more flexibility (the operator is much easier to change than the images).

echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
fi

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

COMMAND=$@

if [ $# -lt 1 ]; then
Expand All @@ -37,11 +29,11 @@ fi
if [ "$COMMAND" = "help" ]; then
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
exit 0
elif [ "$COMMAND" = "jobmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
echo "Starting Job Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
elif [ "$COMMAND" = "taskmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
Expand Down
2 changes: 1 addition & 1 deletion integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const NewImage = "lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2"
const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"

func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1alpha1.FlinkApplication), failurePhase v1alpha1.FlinkApplicationPhase) *v1alpha1.FlinkApplication {
app, err := s.Util.GetFlinkApplication(name)
Expand Down
2 changes: 1 addition & 1 deletion integ/test_app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
environment: development
spec:
image: lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1
image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
imagePullSecrets:
- name: dockerhub
flinkConfig:
Expand Down
29 changes: 27 additions & 2 deletions pkg/controller/flink/container_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
AwsMetadataServiceTimeout = "5"
AwsMetadataServiceNumAttempts = "20"
OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG"
hostName = "HOST_NAME"
hostIP = "HOST_IP"
FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE"
FlinkDeploymentType = "flink-deployment-type"
FlinkDeploymentTypeJobmanager = "jobmanager"
FlinkDeploymentTypeTaskmanager = "taskmanager"
Expand Down Expand Up @@ -87,6 +90,22 @@ func getFlinkEnv(app *v1alpha1.FlinkApplication) ([]v1.EnvVar, error) {
Name: OperatorFlinkConfig,
Value: flinkConfig,
},
{
Name: hostName,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: hostIP,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
}...)
return env, nil
}
Expand Down Expand Up @@ -155,14 +174,20 @@ func HashForApplication(app *v1alpha1.FlinkApplication) string {
return fmt.Sprintf("%08x", hasher.Sum32())
}

func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string) {
func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string, deploymentType string) {
var newContainers []v1.Container
for _, container := range deployment.Spec.Template.Spec.Containers {
var newEnv []v1.EnvVar
for _, env := range container.Env {
if env.Name == OperatorFlinkConfig {
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
if deploymentType == FlinkDeploymentTypeJobmanager {
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: $HOST_NAME\n", env.Value)
}
if deploymentType == FlinkDeploymentTypeTaskmanager {
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value)
}
}
newEnv = append(newEnv, env)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "cb56c9a1"
const testAppHash = "0de896bc"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/flink/job_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func FetchJobManagerContainerObj(application *v1alpha1.FlinkApplication) *coreV1

ports := getJobManagerPorts(application)
operatorEnv := GetFlinkContainerEnv(application)
operatorEnv = append(operatorEnv, coreV1.EnvVar{
Name: FlinkDeploymentTypeEnv,
Value: FlinkDeploymentTypeJobmanager,
})
operatorEnv = append(operatorEnv, jmConfig.Environment.Env...)

return &coreV1.Container{
Expand Down Expand Up @@ -342,7 +346,7 @@ func FetchJobMangerDeploymentCreateObj(app *v1alpha1.FlinkApplication, hash stri
template.Spec.Selector.MatchLabels[FlinkAppHash] = hash
template.Spec.Template.Name = getJobManagerPodName(app, hash)

InjectHashesIntoConfig(template, app, hash)
InjectOperatorCustomizedConfig(template, app, hash, FlinkDeploymentTypeJobmanager)

return template
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}
app.Annotations = annotations
hash := "334c7c5d"
hash := "0b1b39e8"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
"jobmanager.rpc.address: $HOST_NAME\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
case 2:
Expand Down
11 changes: 3 additions & 8 deletions pkg/controller/flink/task_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,9 @@ func FetchTaskManagerContainerObj(application *v1alpha1.FlinkApplication) *coreV
}

operatorEnv := GetFlinkContainerEnv(application)

operatorEnv = append(operatorEnv, coreV1.EnvVar{
Name: TaskManagerHostnameEnvVar,
ValueFrom: &coreV1.EnvVarSource{
FieldRef: &coreV1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
Name: FlinkDeploymentTypeEnv,
Value: FlinkDeploymentTypeTaskmanager,
})

operatorEnv = append(operatorEnv, tmConfig.Environment.Env...)
Expand Down Expand Up @@ -228,7 +223,7 @@ func FetchTaskMangerDeploymentCreateObj(app *v1alpha1.FlinkApplication, hash str
template.Spec.Selector.MatchLabels[FlinkAppHash] = hash
template.Spec.Template.Name = getTaskManagerPodName(app, hash)

InjectHashesIntoConfig(template, app, hash)
InjectOperatorCustomizedConfig(template, app, hash, FlinkDeploymentTypeTaskmanager)

return template
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "334c7c5d"
hash := "0b1b39e8"

app.Annotations = annotations
expectedLabels := map[string]string{
Expand All @@ -85,7 +85,8 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
"jobmanager.rpc.address: app-name-"+hash+"\n"+
"taskmanager.host: $HOST_IP\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)

Expand Down