Skip to content

Commit

Permalink
Provide operator configuration via FLINK_PROPERTIES fixes #135 (#149)
Browse files Browse the repository at this point in the history
Note that this change modifies the hash of the application and will cause a restart. The change is backward compatible, since the now deprecated environment variable `OPERATOR_FLINK_CONFIG` is still provided to the containers for a transition period.
  • Loading branch information
tweise committed Dec 24, 2019
1 parent 8460804 commit 2a5baaf
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 24 deletions.
4 changes: 1 addition & 3 deletions examples/wordcount/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ENV PATH=$MAVEN_HOME/bin:$PATH
# Install dependencies
RUN set -ex; \
apt-get update \
&& apt-get -y install gettext-base openjdk-8-jdk-headless \
&& apt-get -y install openjdk-8-jdk-headless \
&& rm -rf /var/lib/apt/lists/*

# Install Maven
Expand All @@ -25,6 +25,4 @@ RUN \
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 mvn package \
&& ln -s /code/target $FLINK_HOME/flink-web-upload

COPY docker-entrypoint.sh /flinkk8soperator-entrypoint.sh
ENTRYPOINT ["/flinkk8soperator-entrypoint.sh"]
CMD ["help"]
10 changes: 0 additions & 10 deletions examples/wordcount/docker-entrypoint.sh

This file was deleted.

4 changes: 2 additions & 2 deletions integ/operator-test-app/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ drop_privs_cmd() {
}

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
if [ -n "$FLINK_PROPERTIES" ]; then
echo "$FLINK_PROPERTIES" >> "/usr/local/flink-conf.yaml"
fi

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/flink/container_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
AwsMetadataServiceNumAttemptsKey = "AWS_METADATA_SERVICE_NUM_ATTEMPTS"
AwsMetadataServiceTimeout = "5"
AwsMetadataServiceNumAttempts = "20"
OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG"
OperatorFlinkConfig = "FLINK_PROPERTIES"
HostName = "HOST_NAME"
HostIP = "HOST_IP"
FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE"
Expand Down Expand Up @@ -208,6 +208,8 @@ func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1.
if deploymentType == FlinkDeploymentTypeTaskmanager {
env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value)
}
// backward compatibility: https://github.com/lyft/flinkk8soperator/issues/135
newEnv = append(newEnv, v1.EnvVar{Name: "OPERATOR_FLINK_CONFIG", Value: env.Value})
}
newEnv = append(newEnv, env)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "8c2d576f"
const testAppHash = "752c76d3"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
hash := "3b2fc68e"
hash := "c3c0af0b"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"taskmanager.numberOfTaskSlots: 16\n\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
"FLINK_PROPERTIES").Value)
case 2:
service := object.(*coreV1.Service)
assert.Equal(t, app.Name, service.Name)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
hash := "4a2f1a08"
hash := "52623ded"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: $HOST_IP\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
"FLINK_PROPERTIES").Value)
case 2:
service := object.(*coreV1.Service)
assert.Equal(t, app.Name, service.Name)
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "3b2fc68e"
hash := "c3c0af0b"

app.Annotations = annotations
expectedLabels := map[string]string{
Expand All @@ -87,7 +87,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"jobmanager.rpc.address: app-name-"+hash+"\n"+
"taskmanager.host: $HOST_IP\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
"FLINK_PROPERTIES").Value)

return nil
}
Expand All @@ -107,7 +107,7 @@ func TestTaskManagerHACreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "4a2f1a08"
hash := "52623ded"
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
Expand Down Expand Up @@ -135,6 +135,11 @@ func TestTaskManagerHACreateSuccess(t *testing.T) {
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"taskmanager.host: $HOST_IP\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"FLINK_PROPERTIES").Value)
// backward compatibility: https://github.com/lyft/flinkk8soperator/issues/135
assert.Equal(t, common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"FLINK_PROPERTIES").Value,
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)

Expand Down

0 comments on commit 2a5baaf

Please sign in to comment.