Skip to content

Commit

Permalink
Enabling PodSecurityContext to be configured for JM and TM pod templa…
Browse files Browse the repository at this point in the history
…tes (#158)

* Enabling PodSecurityContext to be configured for JobManager and TaskManager pods
* Updating the CRD to enable pod security context to be specified and applied to JM and TM pods.
* Adding security context fields to crd validation.
* Changing security context injection to maintain hash-based backwards compatibility.
* Adding tests to ensure that security context is being injected properly.
* Addressing linter feedback (#3)
  • Loading branch information
kelly-sm authored and anandswaminathan committed Jan 15, 2020
1 parent e9054a4 commit f8b0a12
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 10 deletions.
23 changes: 23 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ spec:
type: string
serviceAccountName:
type: string
securityContext:
type: object
properties:
fsGroup:
type: integer
minimum: 1
maximum: 65535
runAsGroup:
type: integer
minimum: 1
maximum: 65535
runAsNonRoot:
type: boolean
runAsUser:
type: integer
minimum: 1
maximum: 65535
supplementalGroups:
type: array
items:
type: integer
minimum: 1
maximum: 65535
jarName:
type: string
programArgs:
Expand Down
6 changes: 6 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ Below is the list of fields in the custom resource and their description
* **imagePullSecrets** `type:[]v1.LocalObjectReference`
Indicates name of Secrets, Kubernetes should get the credentials from.

* **serviceAccountName** `type:string`
Pods created for this Flink application will run with the provided service account (which must already exist in the namespace).

* **securityContext** `type:v1.PodSecurityContext`
This allows you to specify pod-level security attributes which will be applied to both job manager and task manager pods created for this Flink application. More information can be found in the [Kubernetes documentation](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/) or the [API spec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podsecuritycontext-v1-core).

* **taskManagerConfig** `type:TaskManagerConfig required=true`
Configuration for the Flink task manager

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type FlinkApplicationSpec struct {
ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"`
ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`
FlinkConfig FlinkConfig `json:"flinkConfig"`
FlinkVersion string `json:"flinkVersion"`
TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
const testFlinkVersion = "1.7"
const testJarName = "test.jar"
const testEntryClass = "com.test.MainClass"
const testProgramArgs = "--test"

func getTestFlinkController() Controller {
testScope := mockScope.NewTestScope()
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/flink/job_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
if serviceAccountName != "" {
deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName
}

if app.Spec.SecurityContext != nil {
deployment.Spec.Template.Spec.SecurityContext = app.Spec.SecurityContext
}

return deployment
}

Expand Down
51 changes: 47 additions & 4 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

var testJarName = "test.jar"
var testEntryClass = "com.test.MainClass"
var testProgramArgs = "--test"

func getJMControllerForTest() JobManagerController {
testScope := mockScope.NewTestScope()
labeled.SetMetricKeys(common.GetValidLabelNames()...)
Expand Down Expand Up @@ -197,6 +193,53 @@ func TestJobManagerHACreateSuccess(t *testing.T) {
assert.True(t, newlyCreated)
}

func TestJobManagerSecurityContextAssignment(t *testing.T) {
err := initTestConfigForIngress()
assert.Nil(t, err)
testController := getJMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs

fsGroup := int64(2000)
runAsUser := int64(1000)
runAsGroup := int64(3000)
runAsNonRoot := bool(true)

app.Spec.SecurityContext = &coreV1.PodSecurityContext{
FSGroup: &fsGroup,
RunAsUser: &runAsUser,
RunAsGroup: &runAsGroup,
RunAsNonRoot: &runAsNonRoot,
}

hash := "c06b960b"

ctr := 0
mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
ctr++
switch ctr {
case 1:
deployment := object.(*v1.Deployment)
assert.Equal(t, getJobManagerName(&app, hash), deployment.Name)

appSc := app.Spec.SecurityContext
depSc := deployment.Spec.Template.Spec.SecurityContext

assert.Equal(t, *appSc.FSGroup, *depSc.FSGroup)
assert.Equal(t, *appSc.RunAsUser, *depSc.RunAsUser)
assert.Equal(t, *appSc.RunAsGroup, *depSc.RunAsGroup)
assert.Equal(t, *appSc.RunAsNonRoot, *depSc.RunAsNonRoot)
}
return nil
}
newlyCreated, err := testController.CreateIfNotExist(context.Background(), &app)
assert.Nil(t, err)
assert.True(t, newlyCreated)
}

func TestJobManagerCreateErr(t *testing.T) {
testController := getJMControllerForTest()
app := getFlinkTestApp()
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/flink/task_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
if serviceAccountName != "" {
deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName
}

if app.Spec.SecurityContext != nil {
deployment.Spec.Template.Spec.SecurityContext = app.Spec.SecurityContext
}

return deployment
}

Expand Down
54 changes: 48 additions & 6 deletions pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/apps/v1"
coreV1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -52,9 +53,9 @@ func TestGetTaskManagerPodName(t *testing.T) {
func TestTaskManagerCreateSuccess(t *testing.T) {
testController := getTMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = "test.jar"
app.Spec.EntryClass = "com.test.MainClass"
app.Spec.ProgramArgs = "--test"
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
Expand Down Expand Up @@ -99,9 +100,9 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
func TestTaskManagerHACreateSuccess(t *testing.T) {
testController := getTMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = "test.jar"
app.Spec.EntryClass = "com.test.MainClass"
app.Spec.ProgramArgs = "--test"
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
Expand Down Expand Up @@ -150,6 +151,47 @@ func TestTaskManagerHACreateSuccess(t *testing.T) {
assert.True(t, newlyCreated)
}

func TestTaskManagerSecurityContextAssignment(t *testing.T) {
testController := getTMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs

fsGroup := int64(2000)
runAsUser := int64(1000)
runAsGroup := int64(3000)
runAsNonRoot := bool(true)

app.Spec.SecurityContext = &coreV1.PodSecurityContext{
FSGroup: &fsGroup,
RunAsUser: &runAsUser,
RunAsGroup: &runAsGroup,
RunAsNonRoot: &runAsNonRoot,
}

hash := "c06b960b"

mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
deployment := object.(*v1.Deployment)
assert.Equal(t, getTaskManagerName(&app, hash), deployment.Name)

appSc := app.Spec.SecurityContext
depSc := deployment.Spec.Template.Spec.SecurityContext

assert.Equal(t, *appSc.FSGroup, *depSc.FSGroup)
assert.Equal(t, *appSc.RunAsUser, *depSc.RunAsUser)
assert.Equal(t, *appSc.RunAsGroup, *depSc.RunAsGroup)
assert.Equal(t, *appSc.RunAsNonRoot, *depSc.RunAsNonRoot)

return nil
}
newlyCreated, err := testController.CreateIfNotExist(context.Background(), &app)
assert.Nil(t, err)
assert.True(t, newlyCreated)
}

func TestTaskManagerCreateErr(t *testing.T) {
testController := getTMControllerForTest()
app := getFlinkTestApp()
Expand Down

0 comments on commit f8b0a12

Please sign in to comment.