diff --git a/examples/wordcount/Dockerfile b/examples/wordcount/Dockerfile index 41b28814..c2e01615 100644 --- a/examples/wordcount/Dockerfile +++ b/examples/wordcount/Dockerfile @@ -7,7 +7,7 @@ ENV PATH=$MAVEN_HOME/bin:$PATH # Install dependencies RUN set -ex; \ apt-get update \ - && apt-get -y install gettext-base openjdk-8-jdk-headless \ + && apt-get -y install openjdk-8-jdk-headless \ && rm -rf /var/lib/apt/lists/* # Install Maven @@ -25,6 +25,4 @@ RUN \ JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 mvn package \ && ln -s /code/target $FLINK_HOME/flink-web-upload -COPY docker-entrypoint.sh /flinkk8soperator-entrypoint.sh -ENTRYPOINT ["/flinkk8soperator-entrypoint.sh"] CMD ["help"] diff --git a/examples/wordcount/docker-entrypoint.sh b/examples/wordcount/docker-entrypoint.sh deleted file mode 100755 index db30823c..00000000 --- a/examples/wordcount/docker-entrypoint.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh - -# Map config from FlinkK8sOperator to base container -# https://github.com/lyft/flinkk8soperator/issues/135 -# https://github.com/docker-flink/docker-flink/pull/91 -if [ -n "$OPERATOR_FLINK_CONFIG" ]; then - export FLINK_PROPERTIES="`echo \"${OPERATOR_FLINK_CONFIG}\" | envsubst`" -fi - -exec /docker-entrypoint.sh "$@" diff --git a/integ/operator-test-app/docker-entrypoint.sh b/integ/operator-test-app/docker-entrypoint.sh index 79f4df3f..ef8882b2 100755 --- a/integ/operator-test-app/docker-entrypoint.sh +++ b/integ/operator-test-app/docker-entrypoint.sh @@ -14,8 +14,8 @@ drop_privs_cmd() { } # Add in extra configs set by the operator -if [ -n "$OPERATOR_FLINK_CONFIG" ]; then - echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml" +if [ -n "$FLINK_PROPERTIES" ]; then + echo "$FLINK_PROPERTIES" >> "/usr/local/flink-conf.yaml" fi envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml diff --git a/pkg/controller/flink/container_utils.go b/pkg/controller/flink/container_utils.go index ff9f93ca..38b53bcd 100644 --- a/pkg/controller/flink/container_utils.go +++ b/pkg/controller/flink/container_utils.go @@ -23,7 +23,7 @@ const ( AwsMetadataServiceNumAttemptsKey = "AWS_METADATA_SERVICE_NUM_ATTEMPTS" AwsMetadataServiceTimeout = "5" AwsMetadataServiceNumAttempts = "20" - OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG" + OperatorFlinkConfig = "FLINK_PROPERTIES" HostName = "HOST_NAME" HostIP = "HOST_IP" FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE" @@ -208,6 +208,8 @@ func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1. if deploymentType == FlinkDeploymentTypeTaskmanager { env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value) } + // backward compatibility: https://github.com/lyft/flinkk8soperator/issues/135 + newEnv = append(newEnv, v1.EnvVar{Name: "OPERATOR_FLINK_CONFIG", Value: env.Value}) } newEnv = append(newEnv, env) } diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index a44c5374..ba06755f 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -31,7 +31,7 @@ import ( const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2" // Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change -const testAppHash = "8c2d576f" +const testAppHash = "752c76d3" const testAppName = "app-name" const testNamespace = "ns" const testJobID = "j1" diff --git a/pkg/controller/flink/job_manager_controller_test.go b/pkg/controller/flink/job_manager_controller_test.go index a0bda43d..39ab6ac3 100644 --- a/pkg/controller/flink/job_manager_controller_test.go +++ b/pkg/controller/flink/job_manager_controller_test.go @@ -59,7 +59,7 @@ func TestJobManagerCreateSuccess(t *testing.T) { "flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"", } app.Annotations = annotations - hash := "3b2fc68e" + hash := "c3c0af0b" expectedLabels := map[string]string{ "flink-app": "app-name", "flink-app-hash": hash, @@ -91,7 +91,7 @@ func TestJobManagerCreateSuccess(t *testing.T) { "taskmanager.numberOfTaskSlots: 16\n\n"+ "jobmanager.rpc.address: app-name-"+hash+"\n", common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, - "OPERATOR_FLINK_CONFIG").Value) + "FLINK_PROPERTIES").Value) case 2: service := object.(*coreV1.Service) assert.Equal(t, app.Name, service.Name) @@ -136,7 +136,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) { app.Spec.FlinkConfig = map[string]interface{}{ "high-availability": "zookeeper", } - hash := "4a2f1a08" + hash := "52623ded" expectedLabels := map[string]string{ "flink-app": "app-name", "flink-app-hash": hash, @@ -169,7 +169,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) { "high-availability.cluster-id: app-name-"+hash+"\n"+ "jobmanager.rpc.address: $HOST_IP\n", common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, - "OPERATOR_FLINK_CONFIG").Value) + "FLINK_PROPERTIES").Value) case 2: service := object.(*coreV1.Service) assert.Equal(t, app.Name, service.Name) diff --git a/pkg/controller/flink/task_manager_controller_test.go b/pkg/controller/flink/task_manager_controller_test.go index 1a85b7a7..cd25999f 100644 --- a/pkg/controller/flink/task_manager_controller_test.go +++ b/pkg/controller/flink/task_manager_controller_test.go @@ -60,7 +60,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) { "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", } - hash := "3b2fc68e" + hash := "c3c0af0b" app.Annotations = annotations expectedLabels := map[string]string{ @@ -87,7 +87,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) { "jobmanager.rpc.address: app-name-"+hash+"\n"+ "taskmanager.host: $HOST_IP\n", common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, - "OPERATOR_FLINK_CONFIG").Value) + "FLINK_PROPERTIES").Value) return nil } @@ -107,7 +107,7 @@ func TestTaskManagerHACreateSuccess(t *testing.T) { "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", } - hash := "4a2f1a08" + hash := "52623ded" app.Spec.FlinkConfig = map[string]interface{}{ "high-availability": "zookeeper", } @@ -135,6 +135,11 @@ func TestTaskManagerHACreateSuccess(t *testing.T) { "taskmanager.numberOfTaskSlots: 16\n\n"+ "high-availability.cluster-id: app-name-"+hash+"\n"+ "taskmanager.host: $HOST_IP\n", + common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_PROPERTIES").Value) + // backward compatibility: https://github.com/lyft/flinkk8soperator/issues/135 + assert.Equal(t, common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_PROPERTIES").Value, common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, "OPERATOR_FLINK_CONFIG").Value)