Skip to content

Commit

Permalink
[#97] [STRMCMP-648] [BREAKING] Fix rendering of flink memory configs (#…
Browse files Browse the repository at this point in the history
…106)

Fix rendering of heap configs
  • Loading branch information
mwylde committed Sep 27, 2019
1 parent 674f2ca commit 77a7532
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 49 deletions.
4 changes: 4 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ spec:
minimum: 1
offHeapMemoryFraction:
type: number
minimum: 0
maximum: 1
nodeSelector:
type: object
properties:
Expand Down Expand Up @@ -222,6 +224,8 @@ spec:
minimum: 1
offHeapMemoryFraction:
type: number
minimum: 0
maximum: 1
nodeSelector:
type: object
properties:
Expand Down
6 changes: 4 additions & 2 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Below is the list of fields in the custom resource and their description

* **offHeapMemoryFraction** `type:float64`
A value between 0 and 1 that represents % of container memory dedicated to system / off heap. The
remaining memory is allocated for heap.
remaining memory is given to the taskmanager. Note that Flink may further reserve some of this
memory for off-heap uses like network buffers, so you may see the JVM heap size configured to
a lower amount.

* **nodeSelector** `type:map[string]string`
Configuration for the node selectors used for the task manager
Expand Down Expand Up @@ -108,7 +110,7 @@ Below is the list of fields in the custom resource and their description
* **volumeMounts** `type:[]v1.VolumeMount`
Describes a mounting of a Volume within a container.

* **ForceRollback** `type:bool`
* **forceRollback** `type:bool`
Can be set to true to force rollback a deploy/update. The rollback is **not** performed when the application is in a **RUNNING** phase.
If an application is successfully rolled back, it is moved to a *DeployFailed* phase. Un-setting or setting `ForceRollback` to `False` will allow updates to progress normally.

2 changes: 2 additions & 0 deletions integ/test_app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ spec:
state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
state.savepoints.dir: file:///checkpoints/flink/savepoints
jobManagerConfig:
offHeapMemoryFraction: 0.2
resources:
requests:
memory: "200Mi"
cpu: "0.2"
replicas: 1
taskManagerConfig:
taskSlots: 2
offHeapMemoryFraction: 0.5
resources:
requests:
memory: "400Mi"
Expand Down
18 changes: 10 additions & 8 deletions pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flink

import (
"fmt"
"math"
"sort"
"strings"

Expand Down Expand Up @@ -84,20 +85,21 @@ func getJobManagerMemory(application *v1beta1.FlinkApplication) int64 {
return jmMemory
}

func getTaskManagerHeapMemory(app *v1beta1.FlinkApplication) float64 {
func computeHeap(memoryInBytes float64, fraction float64) string {
kbs := int64(math.Round(memoryInBytes-(memoryInBytes*fraction)) / 1024)
return fmt.Sprintf("%dk", kbs)
}

func getTaskManagerHeapMemory(app *v1beta1.FlinkApplication) string {
offHeapMemoryFrac := getValidFraction(app.Spec.TaskManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction)
tmMemory := float64(getTaskManagerMemory(app))
heapMemoryBytes := tmMemory - (tmMemory * offHeapMemoryFrac)
heapMemoryMB := heapMemoryBytes / (1024 * 1024)
return heapMemoryMB
return computeHeap(tmMemory, offHeapMemoryFrac)
}

func getJobManagerHeapMemory(app *v1beta1.FlinkApplication) float64 {
func getJobManagerHeapMemory(app *v1beta1.FlinkApplication) string {
offHeapMemoryFrac := getValidFraction(app.Spec.JobManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction)
jmMemory := float64(getJobManagerMemory(app))
heapMemoryBytes := jmMemory - (jmMemory * offHeapMemoryFrac)
heapMemoryMB := heapMemoryBytes / (1024 * 1024)
return heapMemoryMB
return computeHeap(jmMemory, offHeapMemoryFrac)
}

// Renders the flink configuration overrides stored in FlinkApplication.FlinkConfig into a
Expand Down
45 changes: 19 additions & 26 deletions pkg/controller/flink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func TestRenderFlinkConfigOverrides(t *testing.T) {
"akka.timeout: 5s",
fmt.Sprintf("blob.server.port: %d", blobPort),
"env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=39000 -XX:+UseG1GC",
"jobmanager.heap.size: 1536", // defaults
"jobmanager.heap.size: 1572864k", // defaults
fmt.Sprintf("jobmanager.rpc.port: %d", RPCDefaultPort),
fmt.Sprintf("jobmanager.web.port: %d", UIDefaultPort),
fmt.Sprintf("metrics.internal.query-service.port: %d", MetricsQueryDefaultPort),
fmt.Sprintf("query.server.port: %d", QueryDefaultPort),
"taskmanager.heap.size: 512", // defaults
"taskmanager.heap.size: 524288k", // defaults
"taskmanager.network.memory.fraction: 0.1",
"taskmanager.network.request-backoff.max: 5000",
fmt.Sprintf("taskmanager.numberOfTaskSlots: %d", taskSlots),
Expand Down Expand Up @@ -128,66 +128,59 @@ func TestGetJobManagerMemory(t *testing.T) {
assert.Equal(t, expectedValue, getJobManagerMemory(&app))
}

func TestGetTaskManagerHeapMemory(t *testing.T) {
func TestEnsureNoFractionalHeapMemory(t *testing.T) {
app := v1beta1.FlinkApplication{}
tmResources := coreV1.ResourceRequirements{
Requests: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
}
offHeapMemoryFraction := float64(0.5)
offHeapMemoryFraction := float64(0.37)
app.Spec.TaskManagerConfig.Resources = &tmResources
app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

tmMemory := float64(getTaskManagerMemory(&app))
expectedtmHeapMemoryMB := (tmMemory - tmMemory*offHeapMemoryFraction) / (1024 * 1024)
assert.Equal(t, expectedtmHeapMemoryMB, getTaskManagerHeapMemory(&app))
assert.Equal(t, "41287k", getTaskManagerHeapMemory(&app))
}

func TestGetJobManagerHeapMemory(t *testing.T) {
func TestGetTaskManagerHeapMemory(t *testing.T) {
app := v1beta1.FlinkApplication{}
jmResources := coreV1.ResourceRequirements{
tmResources := coreV1.ResourceRequirements{
Requests: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
}
offHeapMemoryFraction := float64(0.5)
app.Spec.JobManagerConfig.Resources = &jmResources
app.Spec.JobManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction
app.Spec.TaskManagerConfig.Resources = &tmResources
app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

jmMemory := float64(getJobManagerMemory(&app))
expectedjmHeapMemoryMB := (jmMemory - jmMemory*offHeapMemoryFraction) / (1024 * 1024)
assert.Equal(t, expectedjmHeapMemoryMB, getJobManagerHeapMemory(&app))
assert.Equal(t, "32768k", getTaskManagerHeapMemory(&app))
}

func TestInvalidMemoryFraction(t *testing.T) {
func TestGetJobManagerHeapMemory(t *testing.T) {
app := v1beta1.FlinkApplication{}
jmResources := coreV1.ResourceRequirements{
Requests: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: coreV1.ResourceList{
coreV1.ResourceCPU: resource.MustParse("2"),
coreV1.ResourceMemory: resource.MustParse("1Mi"),
coreV1.ResourceMemory: resource.MustParse("64Mi"),
},
}
offHeapMemoryFraction := float64(1.5)
offHeapMemoryFraction := float64(0.5)
app.Spec.JobManagerConfig.Resources = &jmResources
app.Spec.JobManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction

jmMemory := float64(getJobManagerMemory(&app))
expectedjmHeapMemoryMB := (jmMemory - jmMemory*OffHeapMemoryDefaultFraction) / (1024 * 1024)
assert.Equal(t, expectedjmHeapMemoryMB, getJobManagerHeapMemory(&app))

assert.Equal(t, "32768k", getJobManagerHeapMemory(&app))
}
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "c503e97e"
const testAppHash = "8c2d576f"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
hash := "d62c9c38"
hash := "3b2fc68e"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand All @@ -84,10 +84,10 @@ func TestJobManagerCreateSuccess(t *testing.T) {
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1536\n"+
assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
hash := "063e33b7"
hash := "4a2f1a08"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand All @@ -161,10 +161,10 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1536\n"+
assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: $HOST_IP\n",
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "d62c9c38"
hash := "3b2fc68e"

app.Annotations = annotations
expectedLabels := map[string]string{
Expand All @@ -79,10 +79,10 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace)
assert.Equal(t, expectedLabels, deployment.Labels)

assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1536\n"+
assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n"+
"taskmanager.host: $HOST_IP\n",
Expand All @@ -107,7 +107,7 @@ func TestTaskManagerHACreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "063e33b7"
hash := "4a2f1a08"
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
Expand All @@ -128,10 +128,10 @@ func TestTaskManagerHACreateSuccess(t *testing.T) {
assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace)
assert.Equal(t, expectedLabels, deployment.Labels)

assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1536\n"+
assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"taskmanager.host: $HOST_IP\n",
Expand Down

0 comments on commit 77a7532

Please sign in to comment.