diff --git a/deploy/crd.yaml b/deploy/crd.yaml index ed120683..e83dcc13 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -97,6 +97,8 @@ spec: minimum: 1 offHeapMemoryFraction: type: number + minimum: 0 + maximum: 1 nodeSelector: type: object properties: @@ -222,6 +224,8 @@ spec: minimum: 1 offHeapMemoryFraction: type: number + minimum: 0 + maximum: 1 nodeSelector: type: object properties: diff --git a/docs/crd.md b/docs/crd.md index 2c45e67f..b863ce17 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -34,7 +34,9 @@ Below is the list of fields in the custom resource and their description * **offHeapMemoryFraction** `type:float64` A value between 0 and 1 that represents % of container memory dedicated to system / off heap. The - remaining memory is allocated for heap. + remaining memory is given to the taskmanager. Note that Flink may further reserve some of this + memory for off-heap uses like network buffers, so you may see the JVM heap size configured to + a lower amount. * **nodeSelector** `type:map[string]string` Configuration for the node selectors used for the task manager @@ -108,7 +110,7 @@ Below is the list of fields in the custom resource and their description * **volumeMounts** `type:[]v1.VolumeMount` Describes a mounting of a Volume within a container. - * **ForceRollback** `type:bool` + * **forceRollback** `type:bool` Can be set to true to force rollback a deploy/update. The rollback is **not** performed when the application is in a **RUNNING** phase. If an application is successfully rolled back, it is moved to a *DeployFailed* phase. Un-setting or setting `ForceRollback` to `False` will allow updates to progress normally. diff --git a/integ/test_app.yaml b/integ/test_app.yaml index a367fcd7..203afaa7 100644 --- a/integ/test_app.yaml +++ b/integ/test_app.yaml @@ -14,6 +14,7 @@ spec: state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints state.savepoints.dir: file:///checkpoints/flink/savepoints jobManagerConfig: + offHeapMemoryFraction: 0.2 resources: requests: memory: "200Mi" @@ -21,6 +22,7 @@ spec: replicas: 1 taskManagerConfig: taskSlots: 2 + offHeapMemoryFraction: 0.5 resources: requests: memory: "400Mi" diff --git a/pkg/controller/flink/config.go b/pkg/controller/flink/config.go index 17993444..dd885dc0 100644 --- a/pkg/controller/flink/config.go +++ b/pkg/controller/flink/config.go @@ -2,6 +2,7 @@ package flink import ( "fmt" + "math" "sort" "strings" @@ -84,20 +85,21 @@ func getJobManagerMemory(application *v1beta1.FlinkApplication) int64 { return jmMemory } -func getTaskManagerHeapMemory(app *v1beta1.FlinkApplication) float64 { +func computeHeap(memoryInBytes float64, fraction float64) string { + kbs := int64(math.Round(memoryInBytes-(memoryInBytes*fraction)) / 1024) + return fmt.Sprintf("%dk", kbs) +} + +func getTaskManagerHeapMemory(app *v1beta1.FlinkApplication) string { offHeapMemoryFrac := getValidFraction(app.Spec.TaskManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction) tmMemory := float64(getTaskManagerMemory(app)) - heapMemoryBytes := tmMemory - (tmMemory * offHeapMemoryFrac) - heapMemoryMB := heapMemoryBytes / (1024 * 1024) - return heapMemoryMB + return computeHeap(tmMemory, offHeapMemoryFrac) } -func getJobManagerHeapMemory(app *v1beta1.FlinkApplication) float64 { +func getJobManagerHeapMemory(app *v1beta1.FlinkApplication) string { offHeapMemoryFrac := getValidFraction(app.Spec.JobManagerConfig.OffHeapMemoryFraction, OffHeapMemoryDefaultFraction) jmMemory := float64(getJobManagerMemory(app)) - heapMemoryBytes := jmMemory - (jmMemory * offHeapMemoryFrac) - heapMemoryMB := heapMemoryBytes / (1024 * 1024) - return heapMemoryMB + return computeHeap(jmMemory, offHeapMemoryFrac) } // Renders the flink configuration overrides stored in FlinkApplication.FlinkConfig into a diff --git a/pkg/controller/flink/config_test.go b/pkg/controller/flink/config_test.go index b4d57015..6f15baca 100644 --- a/pkg/controller/flink/config_test.go +++ b/pkg/controller/flink/config_test.go @@ -55,12 +55,12 @@ func TestRenderFlinkConfigOverrides(t *testing.T) { "akka.timeout: 5s", fmt.Sprintf("blob.server.port: %d", blobPort), "env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=39000 -XX:+UseG1GC", - "jobmanager.heap.size: 1536", // defaults + "jobmanager.heap.size: 1572864k", // defaults fmt.Sprintf("jobmanager.rpc.port: %d", RPCDefaultPort), fmt.Sprintf("jobmanager.web.port: %d", UIDefaultPort), fmt.Sprintf("metrics.internal.query-service.port: %d", MetricsQueryDefaultPort), fmt.Sprintf("query.server.port: %d", QueryDefaultPort), - "taskmanager.heap.size: 512", // defaults + "taskmanager.heap.size: 524288k", // defaults "taskmanager.network.memory.fraction: 0.1", "taskmanager.network.request-backoff.max: 5000", fmt.Sprintf("taskmanager.numberOfTaskSlots: %d", taskSlots), @@ -128,66 +128,59 @@ func TestGetJobManagerMemory(t *testing.T) { assert.Equal(t, expectedValue, getJobManagerMemory(&app)) } -func TestGetTaskManagerHeapMemory(t *testing.T) { +func TestEnsureNoFractionalHeapMemory(t *testing.T) { app := v1beta1.FlinkApplication{} tmResources := coreV1.ResourceRequirements{ Requests: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, Limits: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, } - offHeapMemoryFraction := float64(0.5) + offHeapMemoryFraction := float64(0.37) app.Spec.TaskManagerConfig.Resources = &tmResources app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction - tmMemory := float64(getTaskManagerMemory(&app)) - expectedtmHeapMemoryMB := (tmMemory - tmMemory*offHeapMemoryFraction) / (1024 * 1024) - assert.Equal(t, expectedtmHeapMemoryMB, getTaskManagerHeapMemory(&app)) + assert.Equal(t, "41287k", getTaskManagerHeapMemory(&app)) } -func TestGetJobManagerHeapMemory(t *testing.T) { +func TestGetTaskManagerHeapMemory(t *testing.T) { app := v1beta1.FlinkApplication{} - jmResources := coreV1.ResourceRequirements{ + tmResources := coreV1.ResourceRequirements{ Requests: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, Limits: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, } offHeapMemoryFraction := float64(0.5) - app.Spec.JobManagerConfig.Resources = &jmResources - app.Spec.JobManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction + app.Spec.TaskManagerConfig.Resources = &tmResources + app.Spec.TaskManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction - jmMemory := float64(getJobManagerMemory(&app)) - expectedjmHeapMemoryMB := (jmMemory - jmMemory*offHeapMemoryFraction) / (1024 * 1024) - assert.Equal(t, expectedjmHeapMemoryMB, getJobManagerHeapMemory(&app)) + assert.Equal(t, "32768k", getTaskManagerHeapMemory(&app)) } -func TestInvalidMemoryFraction(t *testing.T) { +func TestGetJobManagerHeapMemory(t *testing.T) { app := v1beta1.FlinkApplication{} jmResources := coreV1.ResourceRequirements{ Requests: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, Limits: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), - coreV1.ResourceMemory: resource.MustParse("1Mi"), + coreV1.ResourceMemory: resource.MustParse("64Mi"), }, } - offHeapMemoryFraction := float64(1.5) + offHeapMemoryFraction := float64(0.5) app.Spec.JobManagerConfig.Resources = &jmResources app.Spec.JobManagerConfig.OffHeapMemoryFraction = &offHeapMemoryFraction - jmMemory := float64(getJobManagerMemory(&app)) - expectedjmHeapMemoryMB := (jmMemory - jmMemory*OffHeapMemoryDefaultFraction) / (1024 * 1024) - assert.Equal(t, expectedjmHeapMemoryMB, getJobManagerHeapMemory(&app)) - + assert.Equal(t, "32768k", getJobManagerHeapMemory(&app)) } diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 871c6a6c..5cabaef2 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -31,7 +31,7 @@ import ( const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2" // Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change -const testAppHash = "c503e97e" +const testAppHash = "8c2d576f" const testAppName = "app-name" const testNamespace = "ns" const testJobID = "j1" diff --git a/pkg/controller/flink/job_manager_controller_test.go b/pkg/controller/flink/job_manager_controller_test.go index 245b4aba..a0bda43d 100644 --- a/pkg/controller/flink/job_manager_controller_test.go +++ b/pkg/controller/flink/job_manager_controller_test.go @@ -59,7 +59,7 @@ func TestJobManagerCreateSuccess(t *testing.T) { "flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"", } app.Annotations = annotations - hash := "d62c9c38" + hash := "3b2fc68e" expectedLabels := map[string]string{ "flink-app": "app-name", "flink-app-hash": hash, @@ -84,10 +84,10 @@ func TestJobManagerCreateSuccess(t *testing.T) { assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion) assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind) - assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1536\n"+ + assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+ "jobmanager.rpc.port: 6123\n"+ "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ - "query.server.port: 6124\ntaskmanager.heap.size: 512\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ "taskmanager.numberOfTaskSlots: 16\n\n"+ "jobmanager.rpc.address: app-name-"+hash+"\n", common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, @@ -136,7 +136,7 @@ func TestJobManagerHACreateSuccess(t *testing.T) { app.Spec.FlinkConfig = map[string]interface{}{ "high-availability": "zookeeper", } - hash := "063e33b7" + hash := "4a2f1a08" expectedLabels := map[string]string{ "flink-app": "app-name", "flink-app-hash": hash, @@ -161,10 +161,10 @@ func TestJobManagerHACreateSuccess(t *testing.T) { assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion) assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind) - assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1536\n"+ + assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1572864k\n"+ "jobmanager.rpc.port: 6123\n"+ "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ - "query.server.port: 6124\ntaskmanager.heap.size: 512\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ "taskmanager.numberOfTaskSlots: 16\n\n"+ "high-availability.cluster-id: app-name-"+hash+"\n"+ "jobmanager.rpc.address: $HOST_IP\n", diff --git a/pkg/controller/flink/task_manager_controller_test.go b/pkg/controller/flink/task_manager_controller_test.go index 3198162e..1a85b7a7 100644 --- a/pkg/controller/flink/task_manager_controller_test.go +++ b/pkg/controller/flink/task_manager_controller_test.go @@ -60,7 +60,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) { "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", } - hash := "d62c9c38" + hash := "3b2fc68e" app.Annotations = annotations expectedLabels := map[string]string{ @@ -79,10 +79,10 @@ func TestTaskManagerCreateSuccess(t *testing.T) { assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace) assert.Equal(t, expectedLabels, deployment.Labels) - assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1536\n"+ + assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+ "jobmanager.rpc.port: 6123\n"+ "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ - "query.server.port: 6124\ntaskmanager.heap.size: 512\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ "taskmanager.numberOfTaskSlots: 16\n\n"+ "jobmanager.rpc.address: app-name-"+hash+"\n"+ "taskmanager.host: $HOST_IP\n", @@ -107,7 +107,7 @@ func TestTaskManagerHACreateSuccess(t *testing.T) { "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", } - hash := "063e33b7" + hash := "4a2f1a08" app.Spec.FlinkConfig = map[string]interface{}{ "high-availability": "zookeeper", } @@ -128,10 +128,10 @@ func TestTaskManagerHACreateSuccess(t *testing.T) { assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace) assert.Equal(t, expectedLabels, deployment.Labels) - assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1536\n"+ + assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1572864k\n"+ "jobmanager.rpc.port: 6123\n"+ "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ - "query.server.port: 6124\ntaskmanager.heap.size: 512\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ "taskmanager.numberOfTaskSlots: 16\n\n"+ "high-availability.cluster-id: app-name-"+hash+"\n"+ "taskmanager.host: $HOST_IP\n",